2016-08-26 7 views
11

Używam Sparka do konsumpcji danych z Kafki i zapisania go w Cassandrze. Mój program jest napisany w Javie. Używam biblioteki lib spark-streaming-kafka_2.10:1.6.2, aby to osiągnąć. Mój kod to:Jak pobrać identyfikator przesunięcia podczas konsumpcji Kafki ze Sparka, zapisać w Cassandrze i użyć jej do ponownego uruchomienia Kafki?

SparkConf sparkConf = new SparkConf().setAppName("name"); 
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); 
Map<String,String> kafkaParams = new HashMap<>(); 
kafkaParams.put("zookeeper.connect", "127.0.0.1"); 
kafkaParams.put("group.id", App.GROUP); 
JavaPairReceiverInputDStream<String, EventLog> messages = 
    KafkaUtils.createStream(jssc, String.class, EventLog.class, StringDecoder.class, EventLogDecoder.class, 
    kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2()); 
JavaDStream<EventLog> lines = messages.map(new Function<Tuple2<String, EventLog>, EventLog>() { 
    @Override 
    public EventLog call(Tuple2<String, EventLog> tuple2) { 
     return tuple2._2(); 
    } 
}); 
lines.foreachRDD(rdd -> { 
    javaFunctions(rdd).writerBuilder("test", "event_log", mapToRow(EventLog.class)).saveToCassandra(); 
}); 
jssc.start(); 

W moim stole Cassandra event_log, istnieje kolumna o nazwie offsetid przechowywać offset identyfikator strumienia. Jak uzyskać identyfikator przesunięcia do momentu, w którym strumień ten odczytał strumień Kafki i zapisał go w Cassandrze?

Po zapisaniu go w Cassanderze, chcę użyć najnowszego identyfikatora przesunięcia, który będzie używany, gdy Spark zostanie ponownie uruchomiony. Jak mogę to zrobić?

Odpowiedz

3

Poniżej znajduje się kod dla odniesienia może trzeba zmienić rzeczy, jak na swoje wymagania. To, co zrobiłem z kodem i podejściem, polega na tym, że zachowam rozsądne przesunięcie partycji Kafki dla każdego tematu w Cassandrze (Można to zrobić w zookeeperze także jako sugestię używając apletu java). Zapisz lub zaktualizuj najnowszy zakres przesunięcia dla tematu, otrzymując każdą wiadomość tekstową, w tabeli dziennika zdarzeń. Dlatego zawsze pobieraj z tabeli i zobacz, czy jest obecny, a następnie utwórz bezpośredni strumień z tego offsetu, w przeciwnym razie nowy bezpośredni strumień.

package com.spark; 

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; 
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo; 

import java.util.Arrays; 
import java.util.HashMap; 
import java.util.HashSet; 
import java.util.List; 
import java.util.Map; 

import kafka.common.TopicAndPartition; 
import kafka.message.MessageAndMetadata; 
import kafka.serializer.StringDecoder; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.HasOffsetRanges; 
import org.apache.spark.streaming.kafka.KafkaUtils; 
import org.apache.spark.streaming.kafka.OffsetRange; 

import scala.Tuple2; 

public class KafkaChannelFetchOffset { 
    public static void main(String[] args) { 
     String topicName = "topicName"; 
     SparkConf sparkConf = new SparkConf().setAppName("name"); 
     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); 
     HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topicName)); 
     HashMap<TopicAndPartition, Long> kafkaTopicPartition = new HashMap<TopicAndPartition, Long>(); 
     Map<String, String> kafkaParams = new HashMap<>(); 
     kafkaParams.put("zookeeper.connect", "127.0.0.1"); 
     kafkaParams.put("group.id", "GROUP"); 
     kafkaParams.put("metadata.broker.list", "127.0.0.1"); 
     List<EventLog> eventLogList = javaFunctions(jssc).cassandraTable("test", "event_log", mapRowTo(EventLog.class)) 
       .select("topicName", "partion", "fromOffset", "untilOffset").where("topicName=?", topicName).collect(); 
     JavaDStream<String> kafkaOutStream = null; 
     if (eventLogList == null || eventLogList.isEmpty()) { 
      kafkaOutStream = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, 
        topicsSet).transform(new Function<JavaPairRDD<String, String>, JavaRDD<String>>() { 
       @Override 
       public JavaRDD<String> call(JavaPairRDD<String, String> pairRdd) throws Exception { 
        JavaRDD<String> rdd = pairRdd.map(new Function<Tuple2<String, String>, String>() { 
         @Override 
         public String call(Tuple2<String, String> arg0) throws Exception { 
          return arg0._2; 
         } 
        }); 
        writeOffset(rdd, ((HasOffsetRanges) rdd.rdd()).offsetRanges()); 
        return rdd; 
       } 
      }); 
     } else { 
      for (EventLog eventLog : eventLogList) { 
       kafkaTopicPartition.put(new TopicAndPartition(topicName, Integer.parseInt(eventLog.getPartition())), 
         Long.parseLong(eventLog.getUntilOffset())); 
      } 
      kafkaOutStream = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, 
        kafkaParams, kafkaTopicPartition, new Function<MessageAndMetadata<String, String>, String>() { 
         @Override 
         public String call(MessageAndMetadata<String, String> arg0) throws Exception { 
          return arg0.message(); 
         } 
        }).transform(new Function<JavaRDD<String>, JavaRDD<String>>() { 

       @Override 
       public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception { 
        writeOffset(rdd, ((HasOffsetRanges) rdd.rdd()).offsetRanges()); 
        return rdd; 
       } 
      }); 
     } 
     // Use kafkaOutStream for further processing. 
     jssc.start(); 
    } 

    private static void writeOffset(JavaRDD<String> rdd, final OffsetRange[] offsets) { 
     for (OffsetRange offsetRange : offsets) { 
      EventLog eventLog = new EventLog(); 
      eventLog.setTopicName(String.valueOf(offsetRange.topic())); 
      eventLog.setPartition(String.valueOf(offsetRange.partition())); 
      eventLog.setFromOffset(String.valueOf(offsetRange.fromOffset())); 
      eventLog.setUntilOffset(String.valueOf(offsetRange.untilOffset())); 
      javaFunctions(rdd).writerBuilder("test", "event_log", null).saveToCassandra(); 
     } 
    } 
} 

Nadzieja to pomaga i rozwiązać problem ...

+0

Moje 'JavaRDD' jest typu' EventLog'. W nim jest zmienna, która będzie zawierała "offsetId". Jak dodać 'offsetId' do mojego rdd? Nowy obiekt nie zostanie utworzony. Jak zapisać identyfikator przesunięcia w Zookeeperze? – khateeb

+0

Mam inny problem. 'javaFunctions (jssc)' pokazuje błąd mówiąc, że 'metoda javaFunctions (SparkContext) w typie CassandraJavaUtil nie jest odpowiednia dla argumentów (JavaStreamingContext)'. Jakiej wersji używasz? – khateeb

+0

Wystąpienie następującego błędu (jeśli zablokowane): org.apache.spark.rdd.MapPartitionsRDD nie można przesłać do org.apache.spark.streaming.kafka.HasOffsetRanges –

3

A więc chcesz zarządzać offsetami kafka na własną rękę.

W tym:

  1. użycie createDirectStream zamiast createStream. Umożliwi to określenie, z których przesunięć chciałbyś odczytać (fromOffsets: Map[TopicAndPartition, Long])

  2. zebranie informacji o przesunięciach, które już przetworzyłeś. Można to zrobić, zapisując przesunięcie dla każdej wiadomości lub możesz zebrać te informacje w osobnej tabeli. Aby uzyskać przesunięcia w zakresie od rdd: rdd.asInstanceOf[HasOffsetRanges].offsetRanges. Java (zgodnie z dokumentacją) http://spark.apache.org/docs/latest/streaming-kafka-integration.htmlOffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

+1

Poprosiłem o kodzie Java. Rozwiązanie, które podałeś, jest przeznaczone dla Scala. – khateeb

Powiązane problemy