2015-05-05 21 views
6

Strumień danych strumieniowych przetwarza dane w mikropartiach.Udostępnianie danych strumieniowych w iskrze między partiami

Każde dane przedziału są przetwarzane równolegle przy użyciu RDD bez udostępniania danych między każdym interwałem.

Ale mój przypadek użycia musi dzielić dane między interwałami.

Rozważmy przykład Network WordCount, który daje liczbę wszystkich słów otrzymanych w tym przedziale.

Jak utworzyć następującą liczbę słów?

  • względna liczba dla słowa „Hadoop” i „iskry” z poprzedniego przedziału liczyć

  • Normal słowo liczyć wszystkich innych słów.

Uwaga: UpdateStateByKey wykonuje przetwarzanie stanowe, ale dotyczy to każdego rekordu, a nie poszczególnych rekordów.

Tak, UpdateStateByKey nie pasuje do tego wymogu.

Aktualizacja:

Rozważmy następujący przykład

Interval-1

Wejście:

Sample Input with Hadoop and Spark on Hadoop 

wyjściowa:

hadoop 2 
sample 1 
input 1 
with 1 
and 1 
spark 1 
on 1 

Interval-2

Wejście:

Another Sample Input with Hadoop and Spark on Hadoop and another hadoop another spark spark 

wyjściowa:

another 3 
hadoop 1 
spark 2 
and 2 
sample 1 
input 1 
with 1 
on 1 

Objaśnienie:

Pierwszy interwał daje normalną liczbę słów wszystkich słów.

2. W przedziale Hadoop wystąpił 3 razy, ale wyjściowy powinien wynosić 1 (3-2)

zapłonowa wystąpił 3 razy, ale wyjściowy powinien wynosić od 2 (3-1)

Dla wszystkich innych słów powinien dać normalną liczbę słów.

więc podczas przetwarzania danych 2nd odstęp, to powinien mieć 1st przedział za liczby słów Hadoop i iskry

To jest prosty przykład z rysunku.

W rzeczywistych przypadkach pola, które wymagają udostępniania danych, są częścią elementu RDD (RDD), a ogromna liczba wartości musi być śledzona.

tj. W tym przykładzie, podobnie jak słowa hasłowe i iskry, należy śledzić blisko 100k słów kluczowych.

Podobne usecases Apache Burza:

Distributed caching in storm

Storm TransactionalWords

+2

Proszę wyjaśnić „względną liczbę dla słów«Hadoop»i«iskry»z poprzedniego przedziału count”. Nie wahaj się sformalizować definicji wprowadzając zmienne i formułę. Możesz również podać przykład. – huitseeker

+0

Odfiltruj niechciane i zaktualizujKluczKeyKey? –

+0

dodany przykład .. –

Odpowiedz

8

Jest to możliwe dzięki "pamiętanie" ostatni RDD otrzymał i przy użyciu LEFT JOIN scalić te dane z następnego streamingu partia. Używamy streamingContext.remember, aby umożliwić przechowywanie RDD wytworzonych przez proces przesyłania strumieniowego przez czas, który jest im potrzebny.

Korzystamy z faktu, że dstream.transform jest operacją wykonywaną na sterowniku i dlatego mamy dostęp do wszystkich lokalnych definicji obiektów. W szczególności chcemy zaktualizować zmienne odniesienie do ostatniego RDD o wymaganej wartości dla każdej partii.

Prawdopodobnie kawałek kodu sprawia, że ​​idea bardziej jasne:

// configure the streaming context to remember the RDDs produced 
// choose at least 2x the time of the streaming interval 
ssc.remember(xx Seconds) 

// Initialize the "currentData" with an empty RDD of the expected type 
var currentData: RDD[(String, Int)] = sparkContext.emptyRDD 

// classic word count 
val w1dstream = dstream.map(elem => (elem,1))  
val count = w1dstream.reduceByKey(_ + _)  

// Here's the key to make this work. Look how we update the value of the last RDD after using it. 
val diffCount = count.transform{ rdd => 
       val interestingKeys = Set("hadoop", "spark")    
       val interesting = rdd.filter{case (k,v) => interestingKeys(k)}         
       val countDiff = rdd.leftOuterJoin(currentData).map{case (k,(v1,v2)) => (k,v1-v2.getOrElse(0))} 
       currentData = interesting 
       countDiff     
       } 

diffCount.print() 
+0

streamingContext.remember - pamiętaj, że RDD został wygenerowany w ostatnim podanym czasie trwania (może być cache/persist). Istnieje wiele transformacji RDD i akcji powiązanych z tym strumieniem danych strumieniowych. Tak, wiele RDDs musi zapamiętać .. –

+0

Jestem zainteresowany dowiedzieć się więcej na fakt, że dstream.transform jest operacją, która wykonuje na kierowcy. Czy mógłbyś opublikować jakiekolwiek odniesienia .. –

+0

Myślę, że leftOuterJoin będzie kosztowną operacją, gdy wielkość danych jest ogromna. tj. rdd, jeśli z 10 rekordów lakh i currentData jest 1 rekordów lakh, które są dystrybuowane w klastrze. –

Powiązane problemy