2015-01-27 13 views
6

Zrobiłem producenta kafka java. , ale konsola podała błąd. serwer kafka jest na aws. a producent jest na moim macu. i jeszcze serwer kara jest osiągalny. Kiedy wysyłam wiadomość od producenta, serwer kafka pokazuje "Accepted connection ..". Co to jest problem?Błąd producenta Java Kafki

1 [main] INFO kafka.utils.VerifiableProperties - Verifying properties 
28 [main] INFO kafka.utils.VerifiableProperties - Property  metadata.broker.list is overridden to xxxxxx:9092 
28 [main] INFO kafka.utils.VerifiableProperties - Property serializer.class is overridden to kafka.serializer.StringEncoder 
137 [main] INFO kafka.client.ClientUtils$ - Fetching metadata from broker id:0,host: xxxxxx,port:9092 with correlation id 0 for 1 topic(s) Set(words_topic) 
189 [main] ERROR kafka.producer.SyncProducer - Producer connection to xxxxxx:9092 unsuccessful 
198 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata with correlation id 0 for topics [Set(words_topic)] from broker [id:0,host: xxxxxx,port:9092] failed 
199 [main] ERROR kafka.utils.Utils$ - fetching topic metadata for topics [Set(words_topic)] from broker [ArrayBuffer(id:0,host: xxxxxx,port:9092)] failed 

a to Kafka konsolę

[2015-01-27 05:23:33,767] DEBUG Accepted connection from /xxxxx on /xxxx:9092. sendBufferSize [actual|requested]: [212992|1048576] recvBufferSize [actual|requested]: [212992|1048576] (kafka.network.Acceptor) 
[2015-01-27 05:23:33,767] DEBUG Processor 1 listening to new connection from /xxxx:65307 (kafka.network.Processor) 
[2015-01-27 05:23:33,872] INFO Closing socket connection to /xxxx. (kafka.network.Processor) 
[2015-01-27 05:23:33,873] DEBUG Closing connection from /xxxx:65307 (kafka.network.Processor) 

To jest mój kod.

Properties props = new Properties(); 
    props.put("metadata.broker.list", "?????:9092"); 
    props.put("serializer.class", "kafka.serializer.StringEncoder"); 
    ProducerConfig config = new ProducerConfig(props); 
    Producer<String, String> producer = new Producer<String, String>(config); 

    // Now we break each word from the paragraph 
     for (String word : 
      METAMORPHOSIS_OPENING_PARA.split("\\s")) { 
     // Create message to be sent to "words_topic" topic with the word 
     KeyedMessage<String, String> data = 
       new KeyedMessage<String, String> 
         ("words_topic", word); 
     // Send the message 
     producer.send(data); 
    } 
    System.out.println("Produced data"); 
    // close the producer 
    producer.close(); 

} 

// First paragraph from Franz Kafka's Metamorphosis 
private static String METAMORPHOSIS_OPENING_PARA = 
     "One morning, when Gregor Samsa woke from troubled dreams, " 
       + "he found himself transformed in his bed into a horrible " 
       + "vermin. He lay on his armour-like back, and if he lifted " 
       + "his head a little he could see his brown belly, slightly " 
       + "domed and divided by arches into stiff sections."; 
+0

czy jesteś w stanie tworzyć wiadomości do tego samego tematu za pomocą skryptu konsoli dostarczonego z dystrybucją Kafki – user2720864

Odpowiedz

Powiązane problemy