2016-08-13 10 views
11

Co chciałbym zrobić to w ten sposób:Jak wysłać ostateczne wyniki agregacji strumieni kafka w oknie KTT z okienkiem?

  1. Spożywać rekordy z tematu liczb (Longa)
  2. Kruszywo (licznik) wartości dla każdego okna 5 sek
  3. Wyślij ostatecznego wyniku agregacji do kolejnym tematem

Mój kod wygląda następująco:

KStream<String, Long> longs = builder.stream(
     Serdes.String(), Serdes.Long(), "longs"); 

// In one ktable, count by key, on a five second tumbling window. 
KTable<Windowed<String>, Long> longCounts = 
     longs.countByKey(TimeWindows.of("longCounts", 5000L)); 

// Finally, sink to the long-avgs topic. 
longCounts.toStream((wk, v) -> wk.key()) 
     .to("long-counts"); 

Wygląda na to, że wszystko działa zgodnie z oczekiwaniami, ale agregacje są wysyłane do tematu docelowego dla każdego przychodzącego rekordu. Moje pytanie brzmi: jak mogę wysłać tylko końcowy wynik agregacji każdego okna?

Odpowiedz

9

W strumieniach Kafka nie ma czegoś takiego jak "ostateczna agregacja". Okna są otwarte przez cały czas, aby obsłużyć późno przybywające rekordy (oczywiście okna nie są przechowywane wiecznie, są odrzucane, dopóki nie upłynie ich czas retencji - nie ma jednak specjalnej akcji, gdy okno zostanie odrzucone).

dokumentacja See Confluent więcej szczegółów: http://docs.confluent.io/current/streams/

Zatem dla każdej aktualizacji do agregacji, rekordowy wynik jest produkowany (bo Kafka Strumienie aktualizuje również wynik agregacji na koniec zapisów wchodzących). Twój "wynik końcowy" byłby ostatnim rekordem wyniku (zanim okno zostanie odrzucone). W zależności od przypadku zastosowania, instrukcja de-duplikacji byłby sposób, aby rozwiązać problem (stosując niższą API dźwignię transform() lub process())

Ten blogu może pomóc też: https://timothyrenner.github.io/engineering/2016/08/11/kafka-streams-not-looking-at-facebook.html

innym blogu rozwiązania tego posta wydanie bez użycia znaków interpunkcyjnych: http://blog.inovatrend.com/2018/03/making-of-message-gateway-with-kafka.html

+1

Ponadto: Nadchodząca funkcja strumieni Kafka zapewni opcję konfiguracji (bufor/pamięć podręczną, której rozmiar można skonfigurować) do sterowania przepływnością danych wyjściowych/wyjściowych strumieni Kafka. Jeśli ustawisz większy rozmiar bufora, więcej niższego poziomu aktualizacji zostanie połączonych, a tym samym obniżona zostanie prędkość pobierania. –

+2

Dzięki Matthias. Zaimplementowałem taki procesor, który przechowuje agregacje i przekazuje wyniki. Wygląda na to, że działa dobrze. – odavid

+0

To niefortunne. Migotanie podaje dane wyjściowe okna na końcu każdego "cyklu". Implementacja kafka sprawia, że ​​brzmi to tak, jak potrzebujemy zewnętrznego procesu timera do zrzutu KTable po zakończeniu każdego okna. – ethrbunny

Powiązane problemy