W mojej aplikacji do przesyłania strumieniowego Spark, chcę odwzorować wartość na podstawie słownika, który jest pobierany z backendu (ElasticSearch). Chcę okresowo odświeżać słownik okresowo, na wypadek gdyby został zaktualizowany w backend. Byłoby podobne do okresowej funkcji odświeżania filtru Logstash translate. Jak mogę to osiągnąć przy pomocy Sparka (np. W jakiś sposób niespecjalistyczna RDD co 30 sekund)?Spark Streaming: Jak okresowo odświeżać buforowane RDD?
6
A
Odpowiedz
5
Najlepszym sposobem, w jaki to zrobiłem, jest odtworzenie RDD i zachowanie zmiennego odniesienia do niego. Spark Streaming stanowi rdzeń ram harmonogramu na Spark. Możemy wycofać się z harmonogramu, aby okresowo aktualizować RDD. W tym celu używamy pustego DStream że możemy zaplanować tylko dla operacji odświeżania:
def getData():RDD[Data] = ??? function to create the RDD we want to use af reference data
val dstream = ??? // our data stream
// a dstream of empty data
val refreshDstream = new ConstantInputDStream(ssc, sparkContext.parallelize(Seq())).window(Seconds(refreshInterval),Seconds(refreshInterval))
var referenceData = getData()
referenceData.cache()
refreshDstream.foreachRDD{_ =>
// evict the old RDD from memory and recreate it
referenceData.unpersist(true)
referenceData = getData()
referenceData.cache()
}
val myBusinessData = dstream.transform(rdd => rdd.join(referenceData))
... etc ...
w przeszłości, ja też próbowałem tylko przeplatanie cache()
i unpersist()
bez rezultatu (odświeża tylko raz). Odtwarzanie RDD usuwa cały rodowód i zapewnia czyste ładowanie nowych danych.
Powiązane problemy
- 1. Spark RDD checkpoint na trwali/buforowane RDD przeprowadzasz DAG dwukrotnie
- 2. Spark Streaming: foreachRDD aktualizuje moje mongo RDD
- 3. Spark Streaming MapWithState wydaje się okresowo odbudowywać stan kompletny
- 4. Spark streaming DStream RDD, aby uzyskać nazwę pliku
- 5. JSF, okresowo odświeżać komponent z ajaxem?
- 6. W Spark Streaming, jak wykryć pustą partię?
- 7. Czytanie z Cassandry za pomocą Spark Streaming
- 8. Spark: RDD do listy
- 9. Spark Streaming Kafka stream
- 10. Przetwarzanie w kolejnoś ci w Spark Streaming
- 11. Spark streaming z Kafka - createDirectStream vs createStream
- 12. Spark: Jak Unia listy <RDD> do RDD
- 13. Jak utworzyć zestaw danych Spark z RDD
- 14. Traktuj Spark RDD jak zwykły Seq
- 15. Apache Spark: Jak przekonwertować urządzenie Spark DataFrame na RDD z typem RDD [(Type1, Type2, ...)]?
- 16. Spark RDD - Mapowanie z dodatkowymi argumentami
- 17. Klasy typu Scalaz dla Apache Spark RDD
- 18. Jak utworzyć kolekcję RDD z RDD?
- 19. Implementacja niestandardowego Spark RDD w Javie
- 20. Spark Streaming - przetwarzanie pliku danych binarnych
- 21. Spark streaming 1.6.0 - Executory odbijające się
- 22. Scala Spark: Podział kolekcji na kilka RDD?
- 23. Tworzenie DataFrame Spark z RDD list
- 24. Filtr oparty na innym RDD w Spark
- 25. Spark streaming mapWithState timeout bez usuwania
- 26. Okresowa transmisja w Apache Spark Streaming
- 27. Nie znaleziono klasy KafkaUtils w Spark streaming
- 28. Nie Serializable wyjątek podczas czytania Kafka nagrywa z Spark Streaming
- 29. Jak wyświetlić listę RDD zdefiniowaną w powłoce Spark?
- 30. Jak przekonwertować Spark RDD na pandasową ramkę danych w ipython?
Czy istnieje alternatywa Java dla funkcji ConstantInputDStream? – user2100493
Czy gwarantuje się, że zadania odświeżania odniesienieData (wyzwalane przez getData()) zawsze się zdarzają przed zaplanowaniem zadań businessDataDStream? Czy mamy scenariusz, w którym referenceData refresh dzieje się, gdy zadania rdd.join (referenceData) są zaplanowane? – Cheeko
@maasg Jak zaplanować wywołanie 'getData()'? Z pytania "co 30 sekund"? –