Próbuję uzyskać i zapisać przesunięcie dla konkretnej wiadomości w Kafce, używając strumienia bezpośredniego Spark. Przeglądanie dokumentacji Spark to prosty sposób na uzyskanie przesunięć zakresu dla każdej partycji, ale potrzebuję zapisać offset początkowy dla każdej wiadomości tematu po pełnym skanowaniu kolejki.Czy możliwe jest uzyskanie określonego przesunięcia komunikatu w Kafce + SparkStreaming?
5
A
Odpowiedz
6
Tak, można użyć wersji MessageAndMetadata w wersji createDirectStream
, która umożliwia dostęp do message metadata
.
Możesz znaleźć przykład tutaj, który zwraca Dstream z tuple3
.
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaParams = Map[String, String]("metadata.broker.list" -> (kafkaBroker))
var fromOffsets = Map[TopicAndPartition, Long]()
val topicAndPartition: TopicAndPartition = new TopicAndPartition(kafkaTopic.trim, 0)
val topicAndPartition1: TopicAndPartition = new TopicAndPartition(kafkaTopic1.trim, 0)
fromOffsets += (topicAndPartition -> inputOffset)
fromOffsets += (topicAndPartition1 -> inputOffset1)
val messagesDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple3[String, Long, String]](ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[String, String]) => {
(mmd.topic ,mmd.offset, mmd.message().toString)
})
W powyższym przykładzie tuple3._1
będzie miał topic
, tuple3._2
będzie miał offset
i tuple3._3
będzie miał message
.
Mam nadzieję, że to pomoże!
Powiązane problemy
- 1. Przesunięcia przechowywane w Zookeeperze lub Kafce?
- 2. Uzyskanie określonego poziomu kategorii
- 3. Zaawansowane gesty iOS: uzyskanie wektora przesunięcia
- 4. Czy możliwe jest uzyskanie obiektu wywołującego zamknięcia w groovy?
- 5. MySQL - Czy możliwe jest uzyskanie "różnicy" dwóch wyników zapytania?
- 6. czy możliwe jest uzyskanie unikalnego numeru identyfikacyjnego z urządzenia mobilnego?
- 7. Czy jest możliwe uzyskanie maksymalnej możliwej długości zmiennej
- 8. Grails - uzyskanie wartości komunikatu od kontrolera
- 9. jest możliwe uzyskanie różnych wyników przy użyciu NSFetchedResultsController?
- 10. Czy jest możliwe uzyskanie docelowej wartości właściwości css podczas przejścia css3 w JavaScript?
- 11. Czy możliwe jest uzyskanie dostępu do wartości parametrów szablonu non-type w specjalnej klasie szablonów?
- 12. Czy jest możliwe skonfigurowanie "wzorca zaznaczania komunikatu" dla kroku wiersza komend definicji definicji TFS2015.2?
- 13. Czy można dodać partycje do istniejącego tematu w Kafce 0.8.2
- 14. Jak działa rebalancing konsumenta w Kafce?
- 15. Czy jest możliwe ustawienie koloru dla określonego słowa w visual studio 9?
- 16. Czy wykrywanie trafień HTML5 jest możliwe?
- 17. Jak korzystać z wielu klientów w Kafce?
- 18. Czy możliwe jest uzyskanie nowych wartości identyfikatora (IDENTITY) przed wstawieniem danych do tabeli?
- 19. Czy jest możliwe uzyskanie $ _FILES ["inputID"] ["tmp_name"] z pola pliku przy użyciu javascript?
- 20. Jak wyświetlić listę producentów w kafce
- 21. Czy w JavaScript jest możliwe metaprogramowanie?
- 22. Czy jest możliwe użycie "number_to_currency" w kontrolerze?
- 23. Czy możliwe jest rozszerzenie tablic w C#?
- 24. Czy jest możliwe utworzenie IME w WinRT?
- 25. Czy możliwe jest przeciążenie operatorów w C?
- 26. Czy możliwe jest metaprogramowanie w C#?
- 27. Czy jest możliwe odtworzenie ścieżki w CAKeyFrameAnimation?
- 28. Czy jest możliwe użycie CallerMemberNameAttribute w f #
- 29. Czy możliwe jest sprawdzenie, czy Handler rozpoznał, że został usunięty?
- 30. Czy możliwe jest posiadanie wielu atrybutów danych- {imie} w HTML5?
Jeśli mam rację, będę mógł czytać z określonego offsetu. Nadal zastanawiam się, czy istnieje prosty sposób obliczenia przesunięcia początkowego każdej wiadomości w obrębie partycji. Potrzebuję zapisać offset dla każdej wiadomości, a następnie użyć tego kodu do odczytania konkretnej wiadomości. Dziękuję Ci! –
Tak, masz rację, ale z powyższym kodem otrzymasz także offset związany z każdą wiadomością w 'messagesDStream'. Mam na myśli 'createDirectStream' daje' Dstream' 'Tuple3', aw każdej krotce otrzymasz' topic-name' oraz 'message' oraz związane z nim' offset'. – avr
Cześć, przepraszam za opóźnioną odpowiedź .. Działa. Zakładam jednak, że "odOffset" jest przesunięciem początkowym, z którego skanuje partycję. Dziękuję bardzo avr. –