5

Jestem nowicjuszem w Kafka 0.9 i testowaniem niektórych funkcji Zdałem sobie sprawę z dziwnego zachowania w implementowanym przez Java Konsumencie (KafkaConsumer).Metoda poll() Kafki Consumer zostaje zablokowana

Broker Kafka znajduje się w zewnętrznej maszynie Ambari.

Nawet jeśli mógłbyś zaimplementować producenta i zacząć wysyłać wiadomości do zewnętrznego brokera, nie mam pojęcia dlaczego, kiedy konsument próbuje odczytać zdarzenia (sondowanie), utknie.

Wiem, że producent działa dobrze, ponieważ mogę spożywać wiadomości przez konsolę (która działa lokalnie na ambari). Ale kiedy wykonuję Java Consumer, nic się nie dzieje, po prostu utknie. Debugowanie kodu I widział, że to dostaje zablokowane na linii poll():

ConsumerRecords<String, String> records = consumer.poll(100); 

Timeout nie robi nic, by the way. Nie ma znaczenia, jeśli wstawisz 0, 100 lub 1000 ms, konsument zostanie zablokowany w tej linii i nie przekroczy limitu czasu ani nie wyrzuci wyjątków.

próbowałem wszelkiego rodzaju właściwości alternatywnych, takich jak advertised.host.name, advertised.listener ... i tak dalej, z zerowym szczęścia.

Każda pomoc będzie bardzo ceniona. Z góry dziękuję!

+0

Czy jesteś w stanie konsumować wiadomości w inny sposób, np. Za pomocą 'kafka-console-consumer.sh'? –

+0

Tak, jestem. Z maszyny, która jest hostem ambari, mogę spożywać wiadomości za pośrednictwem konsolowego klienta. –

+0

A co z maszyną, na której działa użytkownik? Czy wypróbowałeś tam konsola? –

Odpowiedz

1

Przyczyną może być maszyna, na której uruchomiony jest kod konsumenta, nie może połączyć się z zookeeper. Spróbuj uruchomić ten sam kod konsumenta na komputerze, na którym zainstalowana jest twoja Kafka (próbowałem tego i pracowałem dla mnie). Rozwiązałem również problem, wspominając poniższe właściwości w pliku server.properties: advertised.host.name="ip address which you want to expose" // w moim przypadku jest to publiczne IP komputera ec2, mam kafka i zookeepera zainstalowane na tym samym ec2. advertised.port=9092 ConsumerRecords<String, String> records = consumer.poll(100); Powyższe stwierdzenie nie oznacza, że ​​konsument wygaśnie po 100 ms, jest to okres odpytywania. Wszelkie dane przechwycone w ciągu 100 ms są odczytywane w zbiorze rekordów.