2014-10-23 16 views
8

Kontekst: Używam Apache Spark do agregowania liczby uruchomień różnych typów zdarzeń z logów. Dzienniki są przechowywane w obu Cassandra do celów analizy historycznej i Kafka do celów analizy w czasie rzeczywistym. Każdy dziennik ma datę i typ wydarzenia. Dla uproszczenia załóżmy, że chciałem śledzić liczbę dzienników jednego typu na każdy dzień.Połączyć wyniki z partii RDD z strumieniowaniem RDD w Apache Spark

Posiadamy dwa RDD, RDD danych wsadowych od Cassandry i inny streaming RDD od Kafki. Pseudokod:

CassandraJavaRDD<CassandraRow> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(KEYSPACE, TABLE).select("date", "type"); 

JavaPairRDD<String, Integer> batchRDD = cassandraRowsRDD.mapToPair(new PairFunction<CassandraRow, String, Integer>() { 
    @Override 
    public Tuple2<String, Integer> call(CassandraRow row) { 
     return new Tuple2<String, Integer>(row.getString("date"), 1); 
    } 
}).reduceByKey(new Function2<Integer, Integer, Integer>() { 
    @Override 
    public Integer call(Integer count1, Integer count2) { 
     return count1 + count2; 
    } 
}); 

save(batchRDD) // Assume this saves the batch RDD somewhere 

... 

// Assume we read a chunk of logs from the Kafka stream every x seconds. 
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(...); 
JavaPairDStream<String, Integer> streamRDD = kafkaStream.flatMapToPair(new PairFlatMapFunction<Tuple2<String, String>, String, Integer>() { 
    @Override 
    public Iterator<Tuple2<String, Integer> call(Tuple2<String, String> data) { 
     String jsonString = data._2; 
     JSON jsonObj = JSON.parse(jsonString); 
     Date eventDate = ... // get date from json object 
     // Assume startTime is broadcast variable that is set to the time when the job started. 
     if (eventDate.after(startTime.value())) { 
      ArrayList<Tuple2<String, Integer>> pairs = new ArrayList<Tuple2<String, Integer>>(); 
      pairs.add(new Tuple2<String, Integer>(jsonObj.get("date"), 1)); 
      return pairs; 
     } else { 
      return new ArrayList<Tuple2<String, Integer>>(0); // Return empty list when we ignore some logs 
     } 
    } 
}).reduceByKey(new Function2<Integer, Integer, Integer>() { 
    @Override 
    public Integer call(Integer count1, Integer count2) { 
     return count1 + count2; 
    } 
}).updateStateByKey(new Function2<List<Integer>, Optional<List<Integer>>, Optional<Integer>>() { 
    @Override 
    public Optional<Integer> call(List<Integer> counts, Optional<Integer> state) { 
     Integer previousValue = state.or(0l); 
     Integer currentValue = ... // Sum of counts 
     return Optional.of(previousValue + currentValue); 
    } 
}); 
save(streamRDD); // Assume this saves the stream RDD somewhere 

sc.start(); 
sc.awaitTermination(); 

Pytanie: Jak mogę połączyć wyniki z streamRDD z batchRDD? Powiedzmy, że batchRDD ma następujące dane i ta praca była prowadzona na 2014-10-16:

("2014-10-15", 1000000) 
("2014-10-16", 2000000) 

Ponieważ zapytania Cassandra zawarte wszystkie dane tylko do czasu rozpoczęcia kwerendy partii, musimy odczytać z Kafki po zakończeniu zapytania, uwzględniając tylko dzienniki po czasie rozpoczęcia zadania. Zakładamy, że zapytanie trwa długo. Oznacza to, że muszę połączyć historyczne wyniki z wynikami transmisji strumieniowej.

Dla ilustracji:

|------------------------|-------------|--------------|---------> 
tBatchStart    tStreamStart streamBatch1 streamBatch2 

Następnie załóżmy, że w pierwszym strumieniu partii mamy te dane:

("2014-10-19", 1000) 

Następnie chcę połączyć wsadowy RDD z tego strumienia RDD tak, że strumień RDD ma teraz wartość:

("2014-10-19", 2001000) 

Załóżmy, że w drugim strumieniu ws e dostaje te dane:

("2014-10-19", 4000) 

Następnie RDD strumień powinien być aktualizowany mieć wartość:

("2014-10-19", 2005000) 

i tak dalej ...

jest to możliwe do wykorzystania streamRDD.transformToPair(...) połączyć streamRDD dane z danymi batchRDD za pomocą join, ale jeśli zrobimy to dla każdego fragmentu strumienia, to będziemy dodawać liczbę z partiRDD dla każdego fragmentu strumienia, tworząc wartość stanu "double counted", kiedy to powinno być dodane tylko do porcja pierwszego strumienia.

Odpowiedz

4

Aby rozwiązać tę sprawę, ja bym Związku bazę RDD w wyniku zespolonego StateDStream który utrzymuje sumy z danych strumieniowych. To skutecznie zapewnia linię podstawową dla danych zgłaszanych w każdym przedziale czasowym przesyłania strumieniowego, bez zliczania wspomnianej linii bazowej x razy.

Próbowałem tego pomysłu, używając przykładowego WordCount i działa.Rzuć to na REPL na żywo przykład:

(wykorzystanie nc -lk 9876 na osobnej powłoce, aby zapewnić wejście do socketTextStream)

import org.apache.spark.SparkConf 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.storage.StorageLevel 

@transient val defaults = List("magic" -> 2, "face" -> 5, "dust" -> 7) 
val defaultRdd = sc.parallelize(defaults) 

@transient val ssc = new StreamingContext(sc, Seconds(10)) 
ssc.checkpoint("/tmp/spark") 

val lines = ssc.socketTextStream("localhost", 9876, StorageLevel.MEMORY_AND_DISK_SER) 
val words = lines.flatMap(_.split(" ")) 
val wordCount = words.map(x => (x, 1)).reduceByKey(_ + _) 
val historicCount = wordCount.updateStateByKey[Int]{(newValues: Seq[Int], runningCount: Option[Int]) => 
    Some(newValues.sum + runningCount.getOrElse(0)) 
} 
val runningTotal = historicCount.transform{ rdd => rdd.union(defaultRdd)}.reduceByKey(_+_) 

wordCount.print() 
historicCount.print() 
runningTotal.print() 
ssc.start() 
+1

Dzięki. Chciałbym tylko dodać, że zamiast używać 'rdd.union (defaultRdd)' w transformacji, skończyłem używając 'rdd.leftOuterJoin (defaultRdd)' tak, aby 'runningTotal' nie zawierał par, które nie zostały zmienione. Następnie muszę tylko zapisać pary, w których zmieniły się ich wartości. – Bobby

0

Mogłeś dać updateStateByKey spróbować:

def main(args: Array[String]) { 

    val updateFunc = (values: Seq[Int], state: Option[Int]) => { 
     val currentCount = values.foldLeft(0)(_ + _) 
     val previousCount = state.getOrElse(0) 
     Some(currentCount + previousCount) 
    } 

    // stream 
    val ssc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(1)) 
    ssc.checkpoint(".") 
    val lines = ssc.socketTextStream("127.0.0.1", 9999) 
    val words = lines.flatMap(_.split(" ")) 
    val pairs = words.map(word => (word, 1)) 
    val stateWordCounts = pairs.updateStateByKey[Int](updateFunc) 
    stateWordCounts.print() 
    ssc.start() 
    ssc.awaitTermination() 
} 
+0

ja już go używać. Problem polega na tym, że jeśli opcjonalna wartość stanu jest zerowa, to muszę ustawić wartość domyślną. Idealnie byłoby to wartością obliczoną z wsadu RDD. Problem polega na tym, że 'updateStateByKey()' nie przekazuje klucza, więc nie mogę wykonać wyszukiwania wartości wyliczonej z wsadowego RDD. – Bobby

Powiązane problemy