2016-03-09 9 views
6

Uaktualniamy naszą implementację kafka do wersji .9 i używamy nowego konsumenta java api do tworzenia konsumenta. Używam poniższego kodu dla konsumenta i używamy tematu ustawiania dla konsumenta, jak w LINII A a LINE B jest wezwaniem do naszej usługi, która przetwarza otrzymywane wiadomości. Problem polega na tym, że otrzymujemy wyjątek, jeśli przetwarzanie naszych wiadomości zajmuje więcej niż 30 sekund.uaktualnienie kafka do .9 z nowym klientem api

Properties props = new Properties(); 
      props.put("bootstrap.servers", "localhost:9092"); 
      props.put("group.id", "test-group"); 
      props.put("auto.offset.reset", "earliest"); 
      props.put("heartbeat.interval.ms", "1000"); 
      props.put("receive.buffer.bytes", 10485760); 
      props.put("fetch.message.max.bytes", 5242880); 
      props.put("enable.auto.commit", false); 
    //with partition assigned to consumer 


      KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props); 
      // TopicPartition partition0 = new TopicPartition("TEST-TOPIC", 0); 
      //consumer.assign(Arrays.asList(partition0)); 
      //assign topic to consumer without partition 
//LINE A 
      consumer.subscribe(Arrays.asList("TEST-TOPIC"), new ConsumerRebalanceListenerImp()); 
      List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); 
      while (true) { 
       try { 
        ConsumerRecords<Object, Object> records = consumer.poll(1000); 
        consumeFromQueue(records);//LINE B 
        consumer.commitSync(); 

       } catch (CommitFailedException e) { 
        e.printStackTrace(); 
        System.out.println("CommitFailedException"); 
       } catch (Exception e) { 
        e.printStackTrace(); 
        System.out.println("Exception in while consuming messages"); 
       } 

Wyjątek jest

2016-03-03 10: 47: 35,095 INFO 6448 --- [ask-scheduler-3] o.a.k.c.c.internals.AbstractCoordinator: Znakowanie koordynatora 2147483647 martwy. 2016-03-03 10: 47: 35.096 BŁĄD 6448 --- [ask-scheduler-3] oakccinternals.ConsumerCoordinator: Błąd ILLEGAL_GENERATION wystąpił podczas zatwierdzania przesunięć dla grupy TEST-GROUP CommitFailedException org.apache.kafka.clients. consumer.CommitFailedException: nie można zakończyć zatwierdzenia z powodu rebalansu grupy w org.apache.kafka.clients.consumer.internals.ConsumerCoordinator $ OffsetCommitResponseHandler.handle (ConsumerCoordinator.java:552) w org.apache.kafka.clients.consumer. internals.ConsumerCoordinator $ OffsetCommitResponseHandler.handle (ConsumerCoordinator.java:493) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ CoordinatorResponseHandler.onSuccess (AbstractCoordinator.java:665) at org.apache.kafka.clients. consumer.internals.AbstractCoordinator $ CoordinatorResponseHandler.onSuccess (AbstractCoordinator.java:644) at org.apache.kafka.clients.consumer.internals.RequestFuture $ 1.onSuccess (RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals. RequestFuture.fireSuccess (RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete (RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient $ RequestFutureCompletionHandler.onComplete (ConsumerNetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll (NetworkClient.java:274) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll (ConsumerNetworkClient. java: 320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetwo rkClient.java:213)

Powyższy wyjątek przychodzi podczas dokonywania kompensacji. Wszelkie sugestie pomogłyby Ci podziękować

Odpowiedz

6

Dzieje się tak dlatego, że nowy konsument jest jednowątkowy, a jedynym sposobem na utrzymanie pulsu w grupie konsumentów jest odpytywanie lub zatwierdzanie przesunięcia, po 30 sekundach koordynator grupy oznacza, że ​​konsument jest martwy i wzywa do przywrócenia równowagi grupy. W tej sytuacji można zwiększyć liczbę request.timeout.ms lub podzielić pracę i przetwarzanie między 2 wątkami.

+0

Dzięki za odpowiedzi, próbowałem dodając „request.timeout.ms” do 70000, dała mi ten sam wyjątek, mimo że przetwarzanie wiadomości zajęło 30000. –

+0

Nie wiem, dlaczego nie przynosi żadnego efektu. Próbowałem nawet ustawić "session.timeout.ms" na 70000, ale dał mi wyjątek mówiąc "org.apache.kafka.common.errors.ApiException: limit czasu sesji nie mieści się w dopuszczalnym zakresie." Myślałem o twojej drugiej sugestii, ale jeśli coś pójdzie nie tak podczas przetwarzania wątków, jak sobie z tym poradzę? dodając go ponownie do tematu jako nową wiadomość i przywracając zmiany spowodowane przez wiadomość przed wyjątkiem? –

+0

Zwiększ także wartość group.max.session.timeout.ms. – Nautilus

0

Można ograniczyć liczbę komunikatów zwracanych przez poll() poprzez ustawienie

max.partition.fetch.bytes 

do jakiegoś odpowiedniego progu, który jest większy niż największy wiadomości, ale tak niska, że ​​dostaniesz mniej wiadomości na sondzie.

Kafka 0.10.x posiada wsparcie wyraźnie ograniczyć liczbę wiadomości zwracane do klienta poprzez ustawienie

max.poll.records 
Powiązane problemy