2016-06-05 21 views
6

W mojej aplikacji do przesyłania strumieniowego Spark, chcę odwzorować wartość na podstawie słownika, który jest pobierany z backendu (ElasticSearch). Chcę okresowo odświeżać słownik okresowo, na wypadek gdyby został zaktualizowany w backend. Byłoby podobne do okresowej funkcji odświeżania filtru Logstash translate. Jak mogę to osiągnąć przy pomocy Sparka (np. W jakiś sposób niespecjalistyczna RDD co 30 sekund)?Spark Streaming: Jak okresowo odświeżać buforowane RDD?

Odpowiedz

5

Najlepszym sposobem, w jaki to zrobiłem, jest odtworzenie RDD i zachowanie zmiennego odniesienia do niego. Spark Streaming stanowi rdzeń ram harmonogramu na Spark. Możemy wycofać się z harmonogramu, aby okresowo aktualizować RDD. W tym celu używamy pustego DStream że możemy zaplanować tylko dla operacji odświeżania:

def getData():RDD[Data] = ??? function to create the RDD we want to use af reference data 
val dstream = ??? // our data stream 

// a dstream of empty data 
val refreshDstream = new ConstantInputDStream(ssc, sparkContext.parallelize(Seq())).window(Seconds(refreshInterval),Seconds(refreshInterval)) 

var referenceData = getData() 
referenceData.cache() 
refreshDstream.foreachRDD{_ => 
    // evict the old RDD from memory and recreate it 
    referenceData.unpersist(true) 
    referenceData = getData() 
    referenceData.cache() 
} 

val myBusinessData = dstream.transform(rdd => rdd.join(referenceData)) 
... etc ... 

w przeszłości, ja też próbowałem tylko przeplatanie cache() i unpersist() bez rezultatu (odświeża tylko raz). Odtwarzanie RDD usuwa cały rodowód i zapewnia czyste ładowanie nowych danych.

+0

Czy istnieje alternatywa Java dla funkcji ConstantInputDStream? – user2100493

+0

Czy gwarantuje się, że zadania odświeżania odniesienieData (wyzwalane przez getData()) zawsze się zdarzają przed zaplanowaniem zadań businessDataDStream? Czy mamy scenariusz, w którym referenceData refresh dzieje się, gdy zadania rdd.join (referenceData) są zaplanowane? – Cheeko

+0

@maasg Jak zaplanować wywołanie 'getData()'? Z pytania "co 30 sekund"? –

Powiązane problemy