Kontekst: Używam Apache Spark do agregowania liczby uruchomień różnych typów zdarzeń z logów. Dzienniki są przechowywane w obu Cassandra do celów analizy historycznej i Kafka do celów analizy w czasie rzeczywistym. Każdy dziennik ma datę i typ wydarzenia. Dla uproszczenia załóżmy, że chciałem śledzić liczbę dzienników jednego typu na każdy dzień.Połączyć wyniki z partii RDD z strumieniowaniem RDD w Apache Spark
Posiadamy dwa RDD, RDD danych wsadowych od Cassandry i inny streaming RDD od Kafki. Pseudokod:
CassandraJavaRDD<CassandraRow> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(KEYSPACE, TABLE).select("date", "type");
JavaPairRDD<String, Integer> batchRDD = cassandraRowsRDD.mapToPair(new PairFunction<CassandraRow, String, Integer>() {
@Override
public Tuple2<String, Integer> call(CassandraRow row) {
return new Tuple2<String, Integer>(row.getString("date"), 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) {
return count1 + count2;
}
});
save(batchRDD) // Assume this saves the batch RDD somewhere
...
// Assume we read a chunk of logs from the Kafka stream every x seconds.
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(...);
JavaPairDStream<String, Integer> streamRDD = kafkaStream.flatMapToPair(new PairFlatMapFunction<Tuple2<String, String>, String, Integer>() {
@Override
public Iterator<Tuple2<String, Integer> call(Tuple2<String, String> data) {
String jsonString = data._2;
JSON jsonObj = JSON.parse(jsonString);
Date eventDate = ... // get date from json object
// Assume startTime is broadcast variable that is set to the time when the job started.
if (eventDate.after(startTime.value())) {
ArrayList<Tuple2<String, Integer>> pairs = new ArrayList<Tuple2<String, Integer>>();
pairs.add(new Tuple2<String, Integer>(jsonObj.get("date"), 1));
return pairs;
} else {
return new ArrayList<Tuple2<String, Integer>>(0); // Return empty list when we ignore some logs
}
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) {
return count1 + count2;
}
}).updateStateByKey(new Function2<List<Integer>, Optional<List<Integer>>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> counts, Optional<Integer> state) {
Integer previousValue = state.or(0l);
Integer currentValue = ... // Sum of counts
return Optional.of(previousValue + currentValue);
}
});
save(streamRDD); // Assume this saves the stream RDD somewhere
sc.start();
sc.awaitTermination();
Pytanie: Jak mogę połączyć wyniki z streamRDD z batchRDD? Powiedzmy, że batchRDD
ma następujące dane i ta praca była prowadzona na 2014-10-16:
("2014-10-15", 1000000)
("2014-10-16", 2000000)
Ponieważ zapytania Cassandra zawarte wszystkie dane tylko do czasu rozpoczęcia kwerendy partii, musimy odczytać z Kafki po zakończeniu zapytania, uwzględniając tylko dzienniki po czasie rozpoczęcia zadania. Zakładamy, że zapytanie trwa długo. Oznacza to, że muszę połączyć historyczne wyniki z wynikami transmisji strumieniowej.
Dla ilustracji:
|------------------------|-------------|--------------|--------->
tBatchStart tStreamStart streamBatch1 streamBatch2
Następnie załóżmy, że w pierwszym strumieniu partii mamy te dane:
("2014-10-19", 1000)
Następnie chcę połączyć wsadowy RDD z tego strumienia RDD tak, że strumień RDD ma teraz wartość:
("2014-10-19", 2001000)
Załóżmy, że w drugim strumieniu ws e dostaje te dane:
("2014-10-19", 4000)
Następnie RDD strumień powinien być aktualizowany mieć wartość:
("2014-10-19", 2005000)
i tak dalej ...
jest to możliwe do wykorzystania streamRDD.transformToPair(...)
połączyć streamRDD dane z danymi batchRDD za pomocą join
, ale jeśli zrobimy to dla każdego fragmentu strumienia, to będziemy dodawać liczbę z partiRDD dla każdego fragmentu strumienia, tworząc wartość stanu "double counted", kiedy to powinno być dodane tylko do porcja pierwszego strumienia.
Dzięki. Chciałbym tylko dodać, że zamiast używać 'rdd.union (defaultRdd)' w transformacji, skończyłem używając 'rdd.leftOuterJoin (defaultRdd)' tak, aby 'runningTotal' nie zawierał par, które nie zostały zmienione. Następnie muszę tylko zapisać pary, w których zmieniły się ich wartości. – Bobby