2015-03-30 8 views
5

Próbuję pracować z interfejsem kafka API w języku Java. Używam następującej zależności:Jak mogę tworzyć wiadomości za pomocą API Kafka 8.2 w Javie?

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.8.2.0</version> 
</dependency> 

Mam problem z połączeniem się ze zdalnym serwerem kafka. Zmieniłem atrybut portu pliku kafka 'server.properties' na port 8080. Mogę uruchomić zarówno serwer zookeeper, jak i serwer kafka bez problemu. Mogę również używać konsolowych aplikacji producenta i aplikacji konsumenckich dołączonych do pobrania kafka. (Scala wersja 2.10)

Używam następujący kod klienta, aby utworzyć zdalnego KafkaProducer

Properties propsProducer = new Properties(); 

propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080"); 
propsProducer.put("key.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class); 
propsProducer.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class); 
propsProducer.put("topic.metadata.refresh.interval.ms", "0"); 

KafkaProducer<byte[], byte[]> m_kafkaProducer = new KafkaProducer<byte[], byte[]>(propsProducer); 

Po Utworzyłem producenta, mogę uruchomić następującą linię i uzyskać ważne informacje temat powrócił, przyznana strTopic jest nazwą istniejącego tematu.

List<PartitionInfo> partitionInfo = m_kafkaProducer.partitionsFor(strTopic); 

Gdy próbuję wysłać wiadomość, mam następujące:

ProducerRecord<byte[], byte[]> prMessage = new ProducerRecord<byte[],byte[]>(strTopic, strMessage.getBytes()); 

RecordMetadata futureData = m_kafkaProducer.send(prMessage).get(); 

Wezwanie do send() bloki w nieskończoność, a kiedy ręcznie zakończyć proces, widzę, że gniazdo ERROR Zamknięcie z powodu błędu na serwerze kafka (błąd IOException, Connection Reset by Peer).

Co więcej, nic nie jest warte, aby właściwości host.name, advertised.host.name i advertised.port nadal były skomentowane w pliku "server.properties". Aha, i jeśli mogę zmienić linię:

propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080"); 

do

propsProducer.put("bootstrap.servers", "127.0.0.1:8080"); 

i uruchomić go na tym samym serwerze, na którym jest zainstalowany serwer Kafka, to działa, ale próbuję z nim pracować zdalnie.

Doceniam każdą pomoc i jeśli mogę wyjaśnić w ogóle, daj mi znać.

+1

Czy używasz '172.xx.xx.xxx' jako adresu IP hosta? –

+0

Nie, to pełne IP x są tylko maskami. –

+0

Kk. Być może problem z firewallem? Czy można sprawdzić poprawność połączenia sieciowego na porcie 8080 przy użyciu netcata? –

Odpowiedz

3

Po wielu kopaniu postanowiłem zaimplementować przykład znaleziony tutaj: Kafka Producer Example. Skróciłem kod i nie zaimplementowałem klasy partycjonera. Zaktualizowałem pom z wymienioną zależnością i nadal miałem ten sam problem. Ostatecznie dokonałem pewnych zmian konfiguracji i wszystko działało.

Ostatnim elementem układanki było określenie serwera Kafka w/etc/hosts zarówno serwera, jak i komputerów klienta. Dodałem następujące elementy do obu plików.

172.xx.xx.xxx  serverHost1 

Ponownie, x to tylko maski. Następnie ustawiłem plik advertised.host.name w pliku server.properties na serverHost1. UWAGA: Mam ten adres IP po uruchomieniu ifconfig na komputerze serwera.

zmieniłem linię

propsProducer.put("metadata.broker.list", "172.xx.xx.xxx:8080"); 

do

propsProducer.put("metadata.broker.list", "serverHost1:8080"); 

Kafka API nie podoba się fakt, że byłem wyznaczającą IP jako ciąg znaków.Zamiast tego szukał adresu IP z pliku etc/hosts, chociaż dokumentacja mówi:

"Nazwa hosta brokera będzie reklamować producentów i konsumentów. Jeśli nie jest ustawiona, używa wartości dla" host.name ", jeśli skonfigurowano W przeciwnym razie użyje wartości zwróconej przez java.net.InetAddress.getCanonicalHostName(). "

Które po prostu zwrócą adres IP, w postaci ciągu, który wcześniej był używany, jeśli nie jest zdefiniowany w etc/hosts komputera klienta, w przeciwnym razie zwraca nazwę sparowaną z IP (serverHost1 w moim przypadku). Nigdy też nie ustawiłem wartości nazwa_hosta.

+0

Czy bootstrap.servers jest zamiennikiem metadata.broker.list? –

+1

Tak, wierzę w to. W wersji 0.8.2.0 pole to "metadata.broker.list", ale w nowszych wersjach jest to "boostrap.servers" –

+0

Tak! To prawda. z nowym Programatorem Producenta jest to nowa konfiguracja. –

Powiązane problemy