2016-11-24 11 views
7

Mam 2 tematy Kafki przesyłające dokładnie tę samą treść z różnych źródeł, więc mogę mieć wysoką dostępność w przypadku awarii jednego ze źródeł. Próbuję połączyć 2 wątki w jeden temat wyjściowy przy użyciu strumieni Kafka 0.10.1.0, tak, że nie pomijam żadnych komunikatów o błędach i nie ma duplikatów, gdy wszystkie źródła są w górze.Scalanie wielu identycznych tematów Strefy Kafki

Podczas korzystania z metody KStream w wersji leftJoin, jeden z tematów może zejść bez problemu (temat dodatkowy), ale gdy główny temat ulegnie zmniejszeniu, nic nie jest wysyłane do tematu wyjściowego. To wydaje się być, ponieważ zgodnie z Kafka Streams developer guide,

KStream-KStream leftJoin zawsze jest napędzany przez zapisy przybywających z głównego strumienia

więc jeśli nie istnieją żadne zapisy pochodzące z głównego strumienia, to nie użyje rekordów ze strumienia dodatkowego, nawet jeśli istnieją. Gdy główny strumień powróci do trybu online, dane wyjściowe zostaną wznowione normalnie.

Ja również próbowałem przy użyciu outerJoin (który dodaje duplikaty rekordów), a następnie konwersję do KTable i groupByKey pozbyć się duplikatów,

KStream mergedStream = stream1.outerJoin(stream2, 
    (streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1, 
    JoinWindows.of(2000L)) 

mergedStream.groupByKey() 
      .reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore)) 
      .toStream((key,value) -> value) 
      .to(outputStream) 

ale wciąż otrzymuję duplikaty raz na jakiś czas. Używam również commit.interval.ms=200, aby uzyskać KTable do wysyłania do strumienia wyjściowego dość często.

Jaki byłby najlepszy sposób podejścia do tego scalenia, aby uzyskać jednokrotne dane wyjściowe z wielu identycznych tematów wejściowych?

+0

Ogólnie zalecam procesor API, aby rozwiązać problem. Możesz także spróbować przełączyć na aktualną wersję "trunk" (nie masz pewności, czy to możliwe). Połączenia zostały przerobione, co może rozwiązać Twój problem: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics Nowa semantyka join będzie zawarta w Kafce '0.10.2', która ma datę premiery na styczeń 2017 r. (https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan). –

+0

@ MatthiasJ.Sax Przełączyłem się do pnia i wygląda na to, że 'leftJoin' zachowuje się teraz jak' outerJoin' dla złącz KStream-KStream, więc myślę, że powrócę do semantyki 10.1. Teraz próbuję stworzyć fałszywy strumień, który generuje wartości null, które wykorzystam jako podstawowy w lewym połączeniu z tym, który był pierwotny, i wykorzystam to połączenie w lewe połączenie z drugorzędnym. Mam nadzieję, że spowoduje to zawsze posiadanie wartości w strumieniu głównym, nawet jeśli mój podstawowy jest wyłączony (ponieważ otrzymam wartość zerową z pierwszej lewej strony). –

+0

Nowe 'leftJoin' uruchamia z obu stron tak samo jak stare' outerJoin' (myślę, że to masz na myśli "wydaje się, że leftJoin teraz zachowuje się jak zewnętrzna strona"?) - jest to bliższe semantyki SQL niż stare 'leftJoin' - ale' leftJoin' jest nadal inne niż 'outerJoin': jeśli prawa strona uruchamia się i nie znajduje partnera, to zapisuje rekord i nie jest emitowany żaden wynik . –

Odpowiedz

5

Użycie dowolnego rodzaju łączenia nie rozwiąże problemu, ponieważ zawsze kończy się wynikiem brakującym (wewnętrzne połączenie w przypadku niektórych straganów) lub "duplikatami" z null (połączenie lewostronne lub połączenie zewnętrzne) przypadku oba strumienie są online). Zobacz https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics, aby uzyskać szczegółowe informacje na temat semantyki łączenia w strumieniach Kafka.

Tak więc, polecam użycie procesora API, które można mix-and-match z DSL przy użyciu KStreamprocess(), transform() lub transformValues(). Aby uzyskać więcej informacji, patrz How to filter keys and value with a Processor using Kafka Stream DSL.

Można także dodać niestandardowy magazyn do procesora (How to add a custom StateStore to the Kafka Streams DSL processor?), aby filtrowanie błędów było odporne na błędy powielania.

Powiązane problemy