2016-09-03 20 views
5

Kiedy strumieniowe z Kafki użyciu Spark 2.0, otrzymuję następujący błąd:Nie Serializable wyjątek podczas czytania Kafka nagrywa z Spark Streaming

org.apache.spark.SparkException: 
Job aborted due to stage failure: 
Task 0.0 in stage 1.0 (TID 1) had a not serializable result: 
org.apache.kafka.clients.consumer.ConsumerRecord 
Serialization stack: 
    - object not serializable (class: 
org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(
topic = mytopic, partition = 0, offset = 422337, 
CreateTime = 1472871209063, checksum = 2826679694, 
serialized key size = -1, serialized value size = 95874, 
key = null, value = <JSON GOES HERE...> 

Oto odnośny fragment kodu:

val ssc = new StreamingContext(sc, Seconds(2)) 

val topics = Array("ecfs") 
val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](topics, kafkaParams) 
) 

stream 
    .map(_.value()) 
    .flatMap(message => { 
    // parsing here... 
    }) 
    .foreachRDD(rdd => { 
    // processing here... 
    }) 

ssc.start() 

Z tego co wiem, to właśnie ta linia powoduje problem .map(_.value()), jak to naprawić?

Odpowiedz

0

Nie możesz użyć .map on Dstream: [String, String], jak tam użyłeś. Myślę, że można użyć transformacji i następnie zastosować mapę następująco

val streamed_rdd_final = streamed_rdd.transform{ rdd => rdd.map(x => x.split("\t")).map(x=>Array(check_time_to_send.toString,check_time_to_send_utc.toString,x(1),x(2),x(3),x(4),x(5))).map(x => x(1)+"\t"+x(2)+"\t"+x(3)+"\t"+x(4)+"\t"+x(5)+"\t"+x(6)+"\t"+x(7)+"\t")}

lub użyć .map jak kiedyś, ale raczej robi _.value() należy spróbować wysłać funkcję na mapie, jak I wykonano poniżej:

stream.map{case (x, y) => (y.toString)} 
Powiązane problemy