2015-03-27 13 views
6

To jest przykład kodu robocze:org.apache.spark.SparkException: Zadanie nie serializable

JavaPairDStream<String, String> messages = KafkaUtils.createStream(javaStreamingContext, zkQuorum, group, topicMap); 
messages.print(); 
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 
    @Override 
    public String call(Tuple2<String, String> tuple2) { 
     return tuple2._2(); 
    } 
}); 

otrzymuję poniższy błąd:

ERROR: 
org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1435) 
    at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438) 
    at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:140) 
    at org.apache.spark.streaming.api.java.JavaPairDStream.map(JavaPairDStream.scala:46) 
+1

dobrze, jeśli to działa to świetnie :). Jeśli tak nie jest, możesz włączyć debugowanie serializacji Java za pomocą opcji '-Dsun.io.serialization.extendedDebugInfo = true'. –

+1

Dziękuję, nie jest dobrze, próbowałem. JavaDStream linie = messages.map (nowa funkcja , String>() { @Override publiczne wezwanie String (Tuple2 tuple2) { tuple2._2 return(); } }); Ta linia problemów z kodem. –

+0

Całkiem pewien, że ten kod to Java, a nie Scala (to znaczy tag). – SparkleGoat

Odpowiedz

14

Skoro definiowania funkcji za pomocą mapy Anonimowa klasa wewnętrzna, klasa zawierająca musi również być Serializable. Zdefiniuj swoją funkcję mapy jako oddzielną klasę lub uczyń ją statyczną klasą wewnętrzną. Z dokumentacji Java (http://docs.oracle.com/javase/8/docs/platform/serialization/spec/serial-arch.html):

Note - Serialization of inner classes (i.e., nested classes that are not static member classes), including local and anonymous classes, is strongly discouraged for several reasons. Because inner classes declared in non-static contexts contain implicit non-transient references to enclosing class instances, serializing such an inner class instance will result in serialization of its associated outer class instance as well.

+0

Bardzo dziękuję! –

+0

Cieszę się, że pomogło! Proszę przyjąć odpowiedź, jeśli to zrobiło – InPursuit

+0

Spark próbuje serializować obiekt przekazany do mapy, ale nie może serializować go, ponieważ nie implementuje Serializable? Dlaczego Spark robi serializację? A jeśli zdefiniujemy funkcję map jako oddzielną klasę, czy musimy ją również udostępnić Serializable? – Johan

2

tylko dostarczenie próbki kodu:

JavaDStream<String> lines = messages.map(mapFunc); 

zadeklarować wewnętrzną klasy jako zmiennej statycznej:

static Function<Tuple2<String, String>, String> mapFunc=new Function<Tuple2<String, String>, String>() { 
    @Override 
    public String call(Tuple2<String, String> tuple2) { 
     return tuple2._2(); 
    } 
} 
Powiązane problemy