java - kryo serializing of class (task object) in apache spark returns null while de-serialization -
i using java spark api write test application . using class doesn't extends serializable interface . make application work using kryo serializer serialize class . problem observed while debugging during de-serialization returned class object becomes null , in turn throws null pointer exception . seems closure problem things going wrong not sure.since new kind of serialization don't know start digging.
here code testing :
package org.apache.spark.examples; import java.io.filewriter; import java.io.ioexception; import java.io.printwriter; import java.net.inetaddress; import java.net.unknownhostexception; import org.apache.spark.sparkconf; import org.apache.spark.api.java.javardd; import org.apache.spark.api.java.javasparkcontext; import org.apache.spark.api.java.function.function; /** * spark application test serialization issue in spark */ public class test { static printwriter outputfilewriter; static filewriter file; static javasparkcontext ssc; public static void main(string[] args) { string inputfile = "/home/incubator-spark/examples/src/main/scala/org/apache/spark/examples/inputfile.txt"; string master = "local"; string jobname = "testserialization"; string sparkhome = "/home/test/spark_installation/spark-0.7.0"; string sparkjar = "/home/test/testserializationissesinspark/testsparkserissueapp/target/testsparkserissueapp-0.0.1-snapshot.jar"; sparkconf conf = new sparkconf(); conf.set("spark.closure.serializer","org.apache.spark.serializer.kryoserializer"); conf.set("spark.kryo.registrator", "org.apache.spark.examples.myregistrator"); // create spark context if(master.equals("local")){ ssc = new javasparkcontext("local", jobname,conf); //ssc = new javasparkcontext("local", jobname); } else { ssc = new javasparkcontext(master, jobname, sparkhome, sparkjar); } javardd<string> testdata = ssc.textfile(inputfile).cache(); final notserializablejavaclass notserializabletestobject= new notserializablejavaclass("hi "); @suppresswarnings({ "serial", "unchecked"}) javardd<string> classificationresults = testdata.map( new function<string, string>() { @override public string call(string inputrecord) throws exception { if(!inputrecord.isempty()) { //string[] pointdimensions = inputrecord.split(","); string result = ""; try { filewriter file = new filewriter("/home/test/testserializationissesinspark/results/test_result_" + (int) (math.random() * 100)); printwriter outputfile = new printwriter(file); inetaddress ip; ip = inetaddress.getlocalhost(); outputfile.println("ip of server: " + ip); result = notserializabletestobject.testmethod(inputrecord); outputfile.println("result: " + result); outputfile.flush(); outputfile.close(); file.close(); } catch (unknownhostexception e) { e.printstacktrace(); } catch (ioexception e1) { e1.printstacktrace(); } return result; } else { system.out.println("end of elements in stream."); string result = "end of elements in input data"; return result; } } }).cache(); long processedrecords = classificationresults.count(); ssc.stop(); system.out.println("sssssssssss"+processedrecords); } }
here kryoregistrator class
package org.apache.spark.examples; import org.apache.spark.serializer.kryoregistrator; import com.esotericsoftware.kryo.kryo; public class myregistrator implements kryoregistrator { public void registerclasses(kryo kryo) { kryo.register(notserializablejavaclass.class); } }
here class serializing :
package org.apache.spark.examples; public class notserializablejavaclass { public string testvariable; public notserializablejavaclass(string testvariable) { super(); this.testvariable = testvariable; } public string testmethod(string vartoappend){ return this.testvariable + vartoappend; } }
this because spark.closure.serializer
supports java serializer. see http://spark.apache.org/docs/latest/configuration.html spark.closure.serializer
Comments
Post a Comment