Odpowiedz

6

Tak, można użyć wersji MessageAndMetadata w wersji createDirectStream, która umożliwia dostęp do message metadata.

Możesz znaleźć przykład tutaj, który zwraca Dstream z tuple3.

val ssc = new StreamingContext(sparkConf, Seconds(10)) 

val kafkaParams = Map[String, String]("metadata.broker.list" -> (kafkaBroker)) 
var fromOffsets = Map[TopicAndPartition, Long]() 
val topicAndPartition: TopicAndPartition = new TopicAndPartition(kafkaTopic.trim, 0) 
val topicAndPartition1: TopicAndPartition = new TopicAndPartition(kafkaTopic1.trim, 0) 
fromOffsets += (topicAndPartition -> inputOffset) 
fromOffsets += (topicAndPartition1 -> inputOffset1) 

val messagesDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple3[String, Long, String]](ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[String, String]) => { 
    (mmd.topic ,mmd.offset, mmd.message().toString) 
    }) 

W powyższym przykładzie tuple3._1 będzie miał topic, tuple3._2 będzie miał offset i tuple3._3 będzie miał message.

Mam nadzieję, że to pomoże!

+0

Jeśli mam rację, będę mógł czytać z określonego offsetu. Nadal zastanawiam się, czy istnieje prosty sposób obliczenia przesunięcia początkowego każdej wiadomości w obrębie partycji. Potrzebuję zapisać offset dla każdej wiadomości, a następnie użyć tego kodu do odczytania konkretnej wiadomości. Dziękuję Ci! –

+0

Tak, masz rację, ale z powyższym kodem otrzymasz także offset związany z każdą wiadomością w 'messagesDStream'. Mam na myśli 'createDirectStream' daje' Dstream' 'Tuple3', aw każdej krotce otrzymasz' topic-name' oraz 'message' oraz związane z nim' offset'. – avr

+0

Cześć, przepraszam za opóźnioną odpowiedź .. Działa. Zakładam jednak, że "odOffset" jest przesunięciem początkowym, z którego skanuje partycję. Dziękuję bardzo avr. –

Powiązane problemy