Używam KafkaConsumer 0.10 Java api. Chcę spożywać z określonej partycji i określonego przesunięcia. Podniosłem wzrok i odkryłem, że istnieje metoda wyszukiwania, ale rzuca ona wyjątek. Ktoś miał podobny przypadek lub rozwiązanie?KafkaConsumer 0.10 Komunikat o błędzie Java API: Brak bieżącego przypisania partycji
Kod:
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.seek(new TopicPartition("mytopic", 1), 4);
Wyjątek
java.lang.IllegalStateException: No current assignment for partition mytopic-1
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
at xx.xxx.xxx.Test.main(Test.java:182)
Dzięki. To zadziałało :) z kombinacją assign() i seek() – colossal
myślę, że masz na myśli 'group.id' zamiast' application.id' – automaticgiant
Odpowiadając na zbyt wiele pytań #KafkaStream tutaj ... Dzięki za wskazanie @automaticgiant –