Próbuję pracować z interfejsem kafka API w języku Java. Używam następującej zależności:Jak mogę tworzyć wiadomości za pomocą API Kafka 8.2 w Javie?
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.0</version>
</dependency>
Mam problem z połączeniem się ze zdalnym serwerem kafka. Zmieniłem atrybut portu pliku kafka 'server.properties' na port 8080. Mogę uruchomić zarówno serwer zookeeper, jak i serwer kafka bez problemu. Mogę również używać konsolowych aplikacji producenta i aplikacji konsumenckich dołączonych do pobrania kafka. (Scala wersja 2.10)
Używam następujący kod klienta, aby utworzyć zdalnego KafkaProducer
Properties propsProducer = new Properties();
propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080");
propsProducer.put("key.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
propsProducer.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
propsProducer.put("topic.metadata.refresh.interval.ms", "0");
KafkaProducer<byte[], byte[]> m_kafkaProducer = new KafkaProducer<byte[], byte[]>(propsProducer);
Po Utworzyłem producenta, mogę uruchomić następującą linię i uzyskać ważne informacje temat powrócił, przyznana strTopic jest nazwą istniejącego tematu.
List<PartitionInfo> partitionInfo = m_kafkaProducer.partitionsFor(strTopic);
Gdy próbuję wysłać wiadomość, mam następujące:
ProducerRecord<byte[], byte[]> prMessage = new ProducerRecord<byte[],byte[]>(strTopic, strMessage.getBytes());
RecordMetadata futureData = m_kafkaProducer.send(prMessage).get();
Wezwanie do send() bloki w nieskończoność, a kiedy ręcznie zakończyć proces, widzę, że gniazdo ERROR Zamknięcie z powodu błędu na serwerze kafka (błąd IOException, Connection Reset by Peer).
Co więcej, nic nie jest warte, aby właściwości host.name, advertised.host.name i advertised.port nadal były skomentowane w pliku "server.properties". Aha, i jeśli mogę zmienić linię:
propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080");
do
propsProducer.put("bootstrap.servers", "127.0.0.1:8080");
i uruchomić go na tym samym serwerze, na którym jest zainstalowany serwer Kafka, to działa, ale próbuję z nim pracować zdalnie.
Doceniam każdą pomoc i jeśli mogę wyjaśnić w ogóle, daj mi znać.
Czy używasz '172.xx.xx.xxx' jako adresu IP hosta? –
Nie, to pełne IP x są tylko maskami. –
Kk. Być może problem z firewallem? Czy można sprawdzić poprawność połączenia sieciowego na porcie 8080 przy użyciu netcata? –