2014-09-17 16 views
6

OK, zacznę z opracowanej przypadków użycia i będzie wyjaśnić moje pytanie:Wyjaśnić Kinesis Shard Iterator - AWS Java SDK

  1. używać 3rd party analityki internetowej platformy, która wykorzystuje AWS Kinesis strumieni w celu przekazania danych od klienta do ostatecznego miejsca docelowego - strumień Kinesis;
  2. Platforma analizy internetowej wykorzystuje 2 strumienie:
    1. Strumień modułów zbierających dane (pojedynczy strumień odłamków);
    2. Drugi strumień wzbogacający surowe dane ze strumienia kolektora (pojedynczy strumień odłamków); Co najważniejsze, strumień ten pobiera surowe dane z pierwszego strumienia przy użyciu typu iteratora;
  3. że zużywają dane ze strumienia przez AWS Java SDK, secifically użyciu klasy GetShardIteratorRequest;
  4. Obecnie rozwijam klasę ekstrakcji, więc robię to synchronicznie, co oznacza, że ​​konsumuję dane tylko wtedy, gdy kompiluję swoją klasę;
  5. Klasa zadziwiająco działa, chociaż są pewne rzeczy, których nie rozumiem, szczególnie w odniesieniu do sposobu, w jaki dane są zużywane ze strumienia i znaczenia każdego z typów iteratorów;

Mój problem jest to, że dane mi odzyskać to niespójne i nie ma w nim logiki chronologicznym.

  • Podczas korzystania AT_SEQUENCE_NUMBER i stanowią pierwszą liczbę sekwencji z fragmencie z

    .getSequenceNumberRange() getStartingSequenceNumber (.);

    ... jako "nie dostaję wszystkich rekordów. Podobnie, AFTER_SEQUENCE_NUMBER;

  • Kiedy używam LATEST, otrzymuję zero wyników;
  • Kiedy używam TRIM_HORIZON, który powinien nadawać się do użytku, nie wydaje się działać dobrze. Dawał mi dane, a następnie dodałem nowe "zdarzenia" (zapisy do ostatecznego strumienia) i otrzymałem zero rekordów. Zagadka.

Moje pytania to:

  1. W jaki sposób można bezpiecznie spożywać danych ze strumienia, bez konieczności martwienia się o nieodebranych zapisów?
  2. Czy istnieje alternatywa dla ShardIteratorRequest?
  3. Jeśli tak, to w jaki sposób mogę po prostu "przeglądać" strumień i zobaczyć, co jest w środku, aby znaleźć odniesienia do debugowania?
  4. Czego mi brakuje w metodzie TRIM_HORIZON?

Z góry dziękuję, bardzo chciałbym dowiedzieć się nieco więcej o zużyciu danych ze strumienia Kinesis.

+0

ja też mam podobne problemy - choć dla mnie, mam zduplikowane rekordy w każdej iteracji (z użyciem zarówno AT_SEQUENCE_NUMBER i FROM_SEQUENCE_NUMBER), mimo użyciu wartości NextShardIterator z każdej odpowiedzi. Dokumenty są nieco tajemnicze w tej kwestii .... Chciałbym również wiedzieć, co oznacza "untrimmed" (w.r.t TRIM_HORIZON). – Erve1879

+0

Dla przypomnienia, zrobiłem coś w tym samym czasie - wziąłem istniejącego konsumenta Scali, który słucha strumienia w sposób ciągły i po prostu przesyła go z powrotem do czystej Javy dla moich celów. Oto aplikacja Scala, pierwotnie opracowana przez SnowPlow https://github.com/snowplow/kinesis-example-scala-consumer – YuvalHerziger

+0

Niestety, nie jestem przyjazny dla java .....! Chciałbym tylko, żeby były agnostyczne, jasne wytyczne, jak zapewnić idempotencję i 100% "pokrycia" rekordów, jednocześnie pozwalając konsumentom na ponowne uruchomienie, awarie itp. Wydaje się, że negujemy cel Kinezy, jeśli musimy zapisać i sprawdzić w kolejności od SequenceNumber wszystkich wcześniej pobranych rekordów, aby zapewnić, że nie będą one duplikowane. Jestem pewien, że brakuje mi czegoś ....... – Erve1879

Odpowiedz

0

rozumiem zamieszania wyżej, i miałem te same problemy, ale myślę, że zorientowaliśmy się teraz. Zauważ, że używam urządzenia JSON API bezpośrednio bez KCL.

I wydaje się, że API daje klientom 2 podstawowe wybory iteratorów kiedy zaczynają zużywa strumieniowe:

A) TRIM_HORIZON: do czytania zapisów przeszłości opóźnione pomiędzy wieloma minut (nawet godzin), a 24 godzin. Nie wraca ostatnio wprowadzone rekordy. Użycie AFTER_SEQUENCE_NUMBER w ostatnim rekordzie widzianym przez ten iterator zwraca pustą tablicę, nawet jeśli rekordy zostały ostatnio PUT.

B) Najnowsze: do czytania PRZYSZŁE rekordy w czasie rzeczywistym (natychmiast po ich put). Zostałem oszukany przez jedyne zdanie dokumentacji, które mogłem znaleźć na temat tego "Rozpocznij czytanie tuż po ostatniej płycie w odłamku, abyś zawsze czytał najnowsze dane w odłamku." Otrzymujesz pustą tablicę, ponieważ nie zapisano żadnych rekordów od momentu otrzymania iteratora. Jeśli otrzymasz tego typu iterator, a następnie PUT record, ten rekord będzie natychmiast dostępny.

Na koniec, jeśli znasz identyfikator sekwencji ostatnio wprowadzonego rekordu, możesz go pobrać natychmiast, używając AT_SEQUENCE_NUMBER, a późniejsze rekordy można uzyskać za pomocą AFTER_SEQUENCE_NUMBER, nawet jeśli nie pojawią się w iteratorze TRIM_HORIZON.

Powyższe oznacza, że ​​jeśli chcesz wczytać wszystkie znane przeszłe zapisy i przyszłe zapisy w czasie rzeczywistym, musisz użyć kombinacji A i B, z logiką, aby poradzić sobie z zapisami pomiędzy (ostatnia przeszłość) . KCL może nad tym sprawować się gładko.

Powiązane problemy