Co chciałbym zrobić to w ten sposób:Jak wysłać ostateczne wyniki agregacji strumieni kafka w oknie KTT z okienkiem?
- Spożywać rekordy z tematu liczb (Longa)
- Kruszywo (licznik) wartości dla każdego okna 5 sek
- Wyślij ostatecznego wyniku agregacji do kolejnym tematem
Mój kod wygląda następująco:
KStream<String, Long> longs = builder.stream(
Serdes.String(), Serdes.Long(), "longs");
// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts =
longs.countByKey(TimeWindows.of("longCounts", 5000L));
// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
.to("long-counts");
Wygląda na to, że wszystko działa zgodnie z oczekiwaniami, ale agregacje są wysyłane do tematu docelowego dla każdego przychodzącego rekordu. Moje pytanie brzmi: jak mogę wysłać tylko końcowy wynik agregacji każdego okna?
Ponadto: Nadchodząca funkcja strumieni Kafka zapewni opcję konfiguracji (bufor/pamięć podręczną, której rozmiar można skonfigurować) do sterowania przepływnością danych wyjściowych/wyjściowych strumieni Kafka. Jeśli ustawisz większy rozmiar bufora, więcej niższego poziomu aktualizacji zostanie połączonych, a tym samym obniżona zostanie prędkość pobierania. –
Dzięki Matthias. Zaimplementowałem taki procesor, który przechowuje agregacje i przekazuje wyniki. Wygląda na to, że działa dobrze. – odavid
To niefortunne. Migotanie podaje dane wyjściowe okna na końcu każdego "cyklu". Implementacja kafka sprawia, że brzmi to tak, jak potrzebujemy zewnętrznego procesu timera do zrzutu KTable po zakończeniu każdego okna. – ethrbunny