2015-04-07 10 views
14

Łączę się z Kafką przy użyciu biblioteki klientów kafli 0.8.2.1. Mogę z powodzeniem łączyć się z Kafką, ale chcę z wdziękiem poradzić sobie z upadkiem Kafki. Oto moja konfiguracja:Jak mogę z wdziękiem poradzić sobie z awarią Kafki?

kafkaProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl); 
kafkaProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
kafkaProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
kafkaProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3"); 
producer = new KafkaProducer(kafkaProperties); 

Kiedy Kafka jest w dół, pojawia się następujący błąd w moich dziennikach:

WARN: 07 Apr 2015 14:09:49.230 org.apache.kafka.common.network.Selector:276 - [] Error in I/O with localhost/127.0.0.1 
java.net.ConnectException: Connection refused 
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_75] 
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) ~[na:1.7.0_75] 
at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.1.jar:na] 
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.1.jar:na] 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.1.jar:na] 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.1.jar:na] 
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75] 

Błąd ten powtarza się w nieskończonej pętli i blokuje moją aplikację Java. Próbowałem różnych ustawień konfiguracyjnych związanych z limitami czasu, próbami i potwierdzeniami, ale nie byłem w stanie zapobiec występowaniu tej pętli.

Czy istnieje ustawienie konfiguracyjne, które może temu zapobiec? Czy muszę wypróbować inną wersję klienta? Jak z wdziękiem można poradzić sobie z awarią Kafki?

Odpowiedz

20

zorientowali się, że ta kombinacja ustawień pozwala klientowi Kafka szybko zawieść bez trzymając nitkę lub spamowania kłody:

kafkaProperties.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "300"); 
kafkaProperties.setProperty(ProducerConfig.TIMEOUT_CONFIG, "300"); 
kafkaProperties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "10000"); 
kafkaProperties.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "10000"); 

Nie lubię, że klient Kafka posiada gwint podczas próby połączenia do serwer kafka, zamiast być w pełni asynchroniczny, ale to przynajmniej jest funkcjonalne.

+0

Bardzo pomocna, dziękuję. Czy to jest oficjalnie udokumentowane w dowolnym miejscu? – maxenglander

+0

@maxenglander: Nie, że widziałem. Musiałem zacząć przeglądać kod i było wiele prób i błędów. –

+0

"zamiast być w pełni asynchroniczny" – vbence

0

W kliencie 0.9 istnieje również właściwość max.block.ms, która ograniczy czas, w którym klient może pracować.

Powiązane problemy