2016-02-26 20 views
7

Po tworzenie wielu konsumentów (używając Kafka 0.9 Java API), a każdy wątek rozpoczęty, ja otrzymuję następujący wyjątekKafka CommitFailedException wyjątek konsument

Consumer has failed with exception: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance 
class com.messagehub.consumer.Consumer is shutting down. 
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance 
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546) 
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487) 
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681) 
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654) 
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) 
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) 
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) 
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350) 
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288) 
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303) 
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197) 
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187) 
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:157) 
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:352) 
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:936) 
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:905) 

a następnie rozpocząć spożywanie wiadomość normalnie, chciałbym wiedzieć, co powoduje ten wyjątek w celu jego usunięcia.

+0

Hugo, czy nadal występuje ten problem? czy możesz podać więcej informacji? – Nautilus

+0

Tak @nautilus, nadal mam ten problem. Mam 3 klientów, wszyscy w tej samej grupie konsumentów, mam temat z 20 partycjami, z których należy odczytać dane. Ten wyjątek występuje losowo, niemniej jednak konsumenci mogą odczytać dane z tematu/partycji, chociaż ten wyjątek jest wyzwalany. –

+0

konsumenci konsumują dane lub przetwarzają je?Widzę na twoim stosie, że wyjątek ma miejsce, gdy próbujesz dokonać synchronizacji przesunięcia, czy możesz opisać, co dzieje się między zużyciem wiadomości a zatwierdzeniem przesunięcia? Myślę, że to możliwe, że twój klient traci koordynację z bijącym sercem. – Nautilus

Odpowiedz

-1

Jej problem z równoważeniem grupy konsumentów wynika z błędów. Czy możesz nam powiedzieć, z ilu utworzono temat partycji? i ilu konsumentów pracuje? Czy należą do tej samej grupy?

+0

Mam więcej partycji tego członka, dlatego też członek nie powinien odbierać wiadomości z wielu partycji? Mam temat z 20 partycjami i 3 członkami działającymi, wszystkie należą do tej samej grupy konsumenckiej. –

10

Spróbuj również dostosować następujące parametry:

  • heartbeat.interval.ms - To mówi Kafka czekać określoną ilość milisekund przed rozważyć konsument będzie uważany za „martwy”
  • max.partition.fetch.bytes - Ograniczy to ilość wiadomości (do), które konsument otrzyma podczas odpytywania.

Zauważyłem, że ponowne zrównoważenie występuje, jeśli konsument nie zobowiązuje się do Kafki przed upływem czasu bicia serca. Jeśli zatwierdzenie nastąpi po przetworzeniu komunikatów, czas ich przetworzenia określi te parametry. Zmniejszenie liczby wiadomości i zwiększenie czasu bicia serca pomoże uniknąć ponownego zrównoważenia.

Należy również rozważyć użycie większej liczby partycji, więc będzie więcej wątków przetwarzających dane, nawet z mniejszą liczbą wiadomości na sondę.

Napisałem tę małą aplikację do testów. Mam nadzieję, że to pomoże.

https://github.com/ajkret/kafka-sample

UPDATE

Kafka 0.10.x oferuje teraz nowy parametr kontrolować ilość otrzymanych wiadomości: - max.poll.records - Maksymalna liczba rekordów zwracana w pojedyncze wywołanie poll().

UPDATE

Kafka oferuje możliwość pauzie kolejce. Podczas gdy kolejka jest wstrzymana, możesz przetwarzać wiadomości w oddzielnym wątku, co pozwala zadzwonić pod numer KafkaConsumer.poll(), aby wysłać bicie serca. Następnie wywołaj KafkaConsumer.resume() po zakończeniu przetwarzania. W ten sposób można złagodzić problemy związane z przywracaniem równowagi z powodu braku wysyłania pulsu. Oto zarys tego, co możesz zrobić:

while(true) { 
    ConsumerRecords records = consumer.poll(Integer.MAX_VALUE); 
    consumer.commitSync(); 

    consumer.pause(); 
    for(ConsumerRecord record: records) { 

     Future<Boolean> future = workers.submit(() -> { 
      // Process 
      return true; 
     }); 


     while (true) { 
      try { 
       if (future.get(1, TimeUnit.SECONDS) != null) { 
        break; 
       } 
      } catch (java.util.concurrent.TimeoutException e) { 
       getConsumer().poll(0); 
      } 
     } 
    } 

    consumer.resume(); 
} 
+0

Wersja 0.10.x ma teraz nowy parametr * max.poll.records *, który ma być używany zamiast max.partition.fetch.bytes. – ajkret

+0

Użyłem tego samego podejścia, co w Pause i Resume, ale nadal otrzymuję ten sam błąd. Jedyną różnicą jest wywołanie metody commitSync() po pause() i przed wznowieniu(), ponieważ muszę je zatwierdzić tylko wtedy, gdy rekordy są przetwarzane. Każdy pomysł, co robię źle? –

+0

@ mav3n: Spotykam się z tym samym problemem. Próbowałem zwiększyć sesję session.timeout.ms i max.poll.records, ale bez powodzenia. Czy znalazłeś metodę, aby to zrobić? –