2016-12-07 3 views
5

Używam KafkaConsumer 0.10 Java api. Chcę spożywać z określonej partycji i określonego przesunięcia. Podniosłem wzrok i odkryłem, że istnieje metoda wyszukiwania, ale rzuca ona wyjątek. Ktoś miał podobny przypadek lub rozwiązanie?KafkaConsumer 0.10 Komunikat o błędzie Java API: Brak bieżącego przypisania partycji

Kod:

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps); 
consumer.seek(new TopicPartition("mytopic", 1), 4); 

Wyjątek

java.lang.IllegalStateException: No current assignment for partition mytopic-1 
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) 
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) 
    at xx.xxx.xxx.Test.main(Test.java:182) 

Odpowiedz

18

zanim będzie można seek() trzeba najpierw subscribe() do tematu lubassign() rozbiór tematu do konsumenta. Należy również pamiętać, że subscribe() i assign() są leniwe - w związku z tym należy również wykonać "fałszywe połączenie" z poll(), zanim będzie można użyć seek().

Jeśli używasz subscribe(), używasz zarządzania grupami: w ten sposób możesz uruchomić wielu klientów używając tego samego group.id, a wszystkie partycje tematu będą przydzielane równomiernie dla wszystkich klientów w grupie automatycznie (każda partycja zostanie przypisana do pojedynczy konsument w grupie).

Aby odczytywać określone partycje, należy użyć przypisania ręcznego za pomocą assign(). Pozwala to wykonać dowolne zadanie.

Btw: KafkaConsumer ma bardzo długą, szczegółową JavaDoc klasę, w tym przykłady. Warto to przeczytać.

+0

Dzięki. To zadziałało :) z kombinacją assign() i seek() – colossal

+1

myślę, że masz na myśli 'group.id' zamiast' application.id' – automaticgiant

+0

Odpowiadając na zbyt wiele pytań #KafkaStream tutaj ... Dzięki za wskazanie @automaticgiant –

1

Jeśli nie chcesz korzystać z poll() i pobierać zapisy map, a także zmienić samo przesunięcie. wersja Kafka 0,11 Spróbuj tego:

... 
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);  
consumer.subscribe(Arrays.asList("Test_topic1", "Test_topic2")); 
List<TopicPartition> partitions =consumer.partitionsFor("Test_topic1").stream().map(part->{TopicPartition tp = new TopicPartition(part.topic(),part.partition()); return tp;}).collect(Collectors.toList()); 
Field coordinatorField = consumer.getClass().getDeclaredField("coordinator"); 
coordinatorField.setAccessible(true);  

ConsumerCoordinator coordinator = (ConsumerCoordinator)coordinatorField.get(consumer); 
coordinator.poll(new Date().getTime(), 1000);//Watch out for your local date and time settings 
consumer.seekToBeginning(partitions); //or other seek 

Sonda koordynator imprez. Zapewnia to, że koordynator jest znany, a konsument dołączył do grupy (jeśli korzysta z zarządzania grupą). To również obsługuje zatwierdzenia przesunięcia okresowego, jeśli są włączone.

Powiązane problemy