2015-04-26 7 views
10

Aplikacja Spark Java generuje wyjątek NotSerializableException na pismach samoprzylepnych.Napisy w stylu hado-wym NotSerializableException z Apache Spark API

public final class myAPP { 
    public static void main(String[] args) throws Exception {  
    if (args.length < 1) { 
     System.err.println("Usage: myAPP <file>"); 
     System.exit(1); 
    } 
    SparkConf sparkConf = new SparkConf().setAppName("myAPP").setMaster("local"); 
    JavaSparkContext ctx = new JavaSparkContext(sparkConf); 
    Configuration conf = new Configuration(); 
    JavaPairRDD<LongWritable,Text> lines = ctx.newAPIHadoopFile(args[0], TextInputFormat.class, LongWritable.class, Text.class, conf); 
    System.out.println( lines.collect().toString()); 
    ctx.stop(); 
    } 

.

java.io.NotSerializableException: org.apache.hadoop.io.LongWritable 
Serialization stack: 
    - object not serializable (class: org.apache.hadoop.io.LongWritable, value: 15227295) 
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) 
    - object (class scala.Tuple2, (15227295,)) 
    - element of array (index: 0) 
    - array (class [Lscala.Tuple2;, size 1153163) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 
15/04/26 16:05:05 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.LongWritable 
Serialization stack: 
    - object not serializable (class: org.apache.hadoop.io.LongWritable, value: 15227295) 
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) 
    - object (class scala.Tuple2, (15227295,)) 
    - element of array (index: 0) 
    - array (class [Lscala.Tuple2;, size 1153163); not retrying 
15/04/26 16:05:05 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/04/26 16:05:05 INFO TaskSchedulerImpl: Cancelling stage 0 
15/04/26 16:05:05 INFO DAGScheduler: Job 0 failed: collect at Parser2.java:60, took 0.460181 s 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.LongWritable 

W programie Spark Scala rejestruję pisma muflowe, jak poniżej, i działa dobrze.

sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.io.LongWritable], classOf[org.apache.hadoop.io.Text])) 

Najwyraźniej to podejście nie działa z Apache Spark API

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 
sparkConf.set("spark.kryo.registrator", LongWritable.class.getName()); 

.

Napisy do hadoi NotSerializableException za pomocą Apache Spark Java API?

Odpowiedz

1

użycie

sparkConf.set("spark.kryo.classesToRegister", "org.apache.hadoop.io.LongWritable,org.apache.hadoop.io.Text") 

lub można po prostu użyć

ctx.textFile(args[0]); 

załadować RDD

+0

Wygląda powyżej kod jest dla Spark Scala API, ale problem jest z Spark Java API ... –

+0

@VijayInnamuri Yes w użyciu java sparkConf.set ("spark.kryo.classesToRegister", „org.apache.hadoop. io.LongWritable, org.apache.hadoop.io.Text "); iskry.kryo.registrator służy do definiowania klasy rejestratora kryo, która rozszerza org.apache.spark.serializer.KryoRegistrator i zastępuje metodę registerClasses. w metodzie registerClasses można zdefiniować klasy, aby zarejestrować się jak kryo.register (LongWritable.class); – banjara

8

Od Spark v1.4.0, można użyć tego API Java, aby zarejestrować się do klas w odcinkach użycie Kryo: https://spark.apache.org/docs/latest/api/java/org/apache/spark/SparkConf.html#registerKryoClasses(java.lang.Class[]) , poprzez przekazanie tablicy obiektów klasy, z których każdy można uzyskać stosując http://docs.oracle.com/javase/7/docs/api/java/lang/Class.html#forName(java.lang.String)

takich jak:

new SparkConf().registerKryoClasses(new Class<?>[]{ 
    Class.forName("org.apache.hadoop.io.LongWritable"), 
    Class.forName("org.apache.hadoop.io.Text") 
}); 

nadzieję, że to pomaga.

+0

Niesamowite !! Dzięki za tonę. Cały czas drapałem się po głowie. – nish

+0

@lew jesteś bardzo mile widziany! –