OK, zacznę z opracowanej przypadków użycia i będzie wyjaśnić moje pytanie:Wyjaśnić Kinesis Shard Iterator - AWS Java SDK
- używać 3rd party analityki internetowej platformy, która wykorzystuje AWS Kinesis strumieni w celu przekazania danych od klienta do ostatecznego miejsca docelowego - strumień Kinesis;
- Platforma analizy internetowej wykorzystuje 2 strumienie:
- Strumień modułów zbierających dane (pojedynczy strumień odłamków);
- Drugi strumień wzbogacający surowe dane ze strumienia kolektora (pojedynczy strumień odłamków); Co najważniejsze, strumień ten pobiera surowe dane z pierwszego strumienia przy użyciu typu iteratora;
- że zużywają dane ze strumienia przez AWS Java SDK, secifically użyciu klasy
GetShardIteratorRequest
; - Obecnie rozwijam klasę ekstrakcji, więc robię to synchronicznie, co oznacza, że konsumuję dane tylko wtedy, gdy kompiluję swoją klasę;
- Klasa zadziwiająco działa, chociaż są pewne rzeczy, których nie rozumiem, szczególnie w odniesieniu do sposobu, w jaki dane są zużywane ze strumienia i znaczenia każdego z typów iteratorów;
Mój problem jest to, że dane mi odzyskać to niespójne i nie ma w nim logiki chronologicznym.
Podczas korzystania
AT_SEQUENCE_NUMBER
i stanowią pierwszą liczbę sekwencji z fragmencie z.getSequenceNumberRange() getStartingSequenceNumber (.);
... jako "nie dostaję wszystkich rekordów. Podobnie,
AFTER_SEQUENCE_NUMBER
;- Kiedy używam
LATEST
, otrzymuję zero wyników; - Kiedy używam
TRIM_HORIZON
, który powinien nadawać się do użytku, nie wydaje się działać dobrze. Dawał mi dane, a następnie dodałem nowe "zdarzenia" (zapisy do ostatecznego strumienia) i otrzymałem zero rekordów. Zagadka.
Moje pytania to:
- W jaki sposób można bezpiecznie spożywać danych ze strumienia, bez konieczności martwienia się o nieodebranych zapisów?
- Czy istnieje alternatywa dla
ShardIteratorRequest
? - Jeśli tak, to w jaki sposób mogę po prostu "przeglądać" strumień i zobaczyć, co jest w środku, aby znaleźć odniesienia do debugowania?
- Czego mi brakuje w metodzie
TRIM_HORIZON
?
Z góry dziękuję, bardzo chciałbym dowiedzieć się nieco więcej o zużyciu danych ze strumienia Kinesis.
ja też mam podobne problemy - choć dla mnie, mam zduplikowane rekordy w każdej iteracji (z użyciem zarówno AT_SEQUENCE_NUMBER i FROM_SEQUENCE_NUMBER), mimo użyciu wartości NextShardIterator z każdej odpowiedzi. Dokumenty są nieco tajemnicze w tej kwestii .... Chciałbym również wiedzieć, co oznacza "untrimmed" (w.r.t TRIM_HORIZON). – Erve1879
Dla przypomnienia, zrobiłem coś w tym samym czasie - wziąłem istniejącego konsumenta Scali, który słucha strumienia w sposób ciągły i po prostu przesyła go z powrotem do czystej Javy dla moich celów. Oto aplikacja Scala, pierwotnie opracowana przez SnowPlow https://github.com/snowplow/kinesis-example-scala-consumer – YuvalHerziger
Niestety, nie jestem przyjazny dla java .....! Chciałbym tylko, żeby były agnostyczne, jasne wytyczne, jak zapewnić idempotencję i 100% "pokrycia" rekordów, jednocześnie pozwalając konsumentom na ponowne uruchomienie, awarie itp. Wydaje się, że negujemy cel Kinezy, jeśli musimy zapisać i sprawdzić w kolejności od SequenceNumber wszystkich wcześniej pobranych rekordów, aby zapewnić, że nie będą one duplikowane. Jestem pewien, że brakuje mi czegoś ....... – Erve1879