java - kryo serializing of class (task object) in apache spark returns null while de-serialization -


i using java spark api write test application . using class doesn't extends serializable interface . make application work using kryo serializer serialize class . problem observed while debugging during de-serialization returned class object becomes null , in turn throws null pointer exception . seems closure problem things going wrong not sure.since new kind of serialization don't know start digging.

here code testing :

package org.apache.spark.examples;   import java.io.filewriter; import java.io.ioexception; import java.io.printwriter; import java.net.inetaddress; import java.net.unknownhostexception;  import org.apache.spark.sparkconf; import org.apache.spark.api.java.javardd; import org.apache.spark.api.java.javasparkcontext; import org.apache.spark.api.java.function.function;     /**  * spark application test serialization issue in spark  */ public class test {      static printwriter outputfilewriter;     static filewriter file;     static javasparkcontext ssc;      public static void main(string[] args) {           string inputfile = "/home/incubator-spark/examples/src/main/scala/org/apache/spark/examples/inputfile.txt";          string master = "local";         string jobname = "testserialization";         string sparkhome = "/home/test/spark_installation/spark-0.7.0";         string sparkjar = "/home/test/testserializationissesinspark/testsparkserissueapp/target/testsparkserissueapp-0.0.1-snapshot.jar";           sparkconf conf = new sparkconf();         conf.set("spark.closure.serializer","org.apache.spark.serializer.kryoserializer");         conf.set("spark.kryo.registrator", "org.apache.spark.examples.myregistrator");         // create spark context         if(master.equals("local")){             ssc = new javasparkcontext("local", jobname,conf);             //ssc = new javasparkcontext("local", jobname);         } else {             ssc = new javasparkcontext(master, jobname, sparkhome, sparkjar);         }         javardd<string> testdata = ssc.textfile(inputfile).cache();         final notserializablejavaclass notserializabletestobject= new notserializablejavaclass("hi ");         @suppresswarnings({ "serial", "unchecked"})         javardd<string> classificationresults = testdata.map(                 new function<string, string>() {                     @override                     public string call(string inputrecord) throws exception {                                            if(!inputrecord.isempty()) {                             //string[] pointdimensions = inputrecord.split(",");                             string result = "";                              try {                                 filewriter file = new filewriter("/home/test/testserializationissesinspark/results/test_result_" + (int) (math.random() * 100));                                 printwriter outputfile = new printwriter(file);                                  inetaddress ip;                                 ip = inetaddress.getlocalhost();                                 outputfile.println("ip of server: " + ip);                                  result = notserializabletestobject.testmethod(inputrecord);                                 outputfile.println("result: " + result);                                  outputfile.flush();                                 outputfile.close();                                 file.close();                              } catch (unknownhostexception e) {                                 e.printstacktrace();                             }                             catch (ioexception e1) {                                 e1.printstacktrace();                             }                               return result;                         } else {                             system.out.println("end of elements in stream.");                             string result = "end of elements in input data";                             return result;                         }                     }                  }).cache();           long processedrecords = classificationresults.count();          ssc.stop();         system.out.println("sssssssssss"+processedrecords);     } } 

here kryoregistrator class

package org.apache.spark.examples;  import org.apache.spark.serializer.kryoregistrator;  import com.esotericsoftware.kryo.kryo;  public class myregistrator implements kryoregistrator {     public void registerclasses(kryo kryo) {         kryo.register(notserializablejavaclass.class);     } } 

here class serializing :

package org.apache.spark.examples;  public class notserializablejavaclass {     public string testvariable;      public notserializablejavaclass(string testvariable) {         super();         this.testvariable = testvariable;     }      public string testmethod(string vartoappend){         return this.testvariable + vartoappend;     } } 

this because spark.closure.serializer supports java serializer. see http://spark.apache.org/docs/latest/configuration.html spark.closure.serializer


Comments

Popular posts from this blog

c++ - How to add Crypto++ library to Qt project -

jQuery Mobile app not scrolling in Firefox -

how to receive file in java(servlet/jsp) -