2016-01-15 22 views
15

Chcę utworzyć nowy mongodb RDD za każdym razem gdy wchodzę do środka foreachRDD. Jednak mam problemy serializacji:Spark Streaming: foreachRDD aktualizuje moje mongo RDD

mydstream 
    .foreachRDD(rdd => { 
     val mongoClient = MongoClient("localhost", 27017) 
     val db = mongoClient(mongoDatabase) 
     val coll = db(mongoCollection) 
     // ssc is my StreamingContext 
     val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) }) 

to daje mi błąd:

object not serializable (class: org.apache.spark.streaming.StreamingContext, value: [email protected]) 

jakiś pomysł?

+0

'SparkContext' nie podlega przekształcaniu do postaci szeregowej, więc nie można używać wewnątrz żadnej transformacji ani metod działania, należy używać tylko w klasie sterownika. – Shankar

+0

Czy istnieją konkretne powody, dla których konwertujesz listę na rdd w metodzie foreachRDD? – Shankar

Odpowiedz

7

Można spróbować użyć rdd.context zwracającą albo SparkContext lub SparkStreamingContext (jeśli RDD jest DStream).

mydstream foreachRDD { rdd => { 
     val mongoClient = MongoClient("localhost", 27017) 
     val db = mongoClient(mongoDatabase) 
     val coll = db(mongoCollection) 
     val modelsRDDRaw = rdd.context.parallelize(coll.find().toList) }) 

Faktycznie, wydaje się, że RDD posiada również metodę .sparkContext. Szczerze mówiąc, nie znam różnicy, może są to aliasy (?).

2

W moim rozumieniu musisz dodać, jeśli masz obiekt "nie serializowalny", musisz go przekazać przez foreachPartition, abyś mógł nawiązać połączenie z bazą danych w każdym węźle przed uruchomieniem przetwarzania.

mydstream.foreachRDD(rdd => { 
     rdd.foreachPartition{ 
      val mongoClient = MongoClient("localhost", 27017) 
      val db = mongoClient(mongoDatabase) 
      val coll = db(mongoCollection) 
      // ssc is my StreamingContext 
      val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) }}) 
+0

to nie zadziała, ponieważ ssc nie można serializować. –

+0

Możesz spróbować stworzyć swój ssc wewnątrz foreachRDD przed rdd.foreachPartition 'val ssc = StreamingContext.getOrCreate (checkpointdirectory, functionToCreateContext _)' – Rami

Powiązane problemy