2014-09-08 28 views
5

Ostatnio zauważyłem, że Camel ma teraz swój własny komponent dla Kafki, więc postanowiłem dać mu wir.Integracja Camel Kafka

postanowiłem spróbować piękny prosty plik -> temat Kafka następująco ...

<route> 
     <from uri="file:///tmp/input" /> 
     <setHeader headerName="kafka.PARTITION_KEY"> 
      <constant>Test</constant> 
     </setHeader> 
     <to uri="kafka:localhost:9092?topic=test&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;groupId=group1" /> 
</route> 

To wydaje się dość proste, jednak na prowadzeniu to mam ...

java.lang.ClassCastException: java.lang.String cannot be cast to [B 
    at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34) 
    at org.apache.camel.component.kafka.KafkaProducer.process(KafkaProducer.java:78) 

a na kontroli kodu Camel, to jednak następujące ...

String msg = exchange.getIn().getBody(String.class); 
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, partitionKey.toString(), msg); 
producer.send(data); 

Oczywiście, jest to problem serializacji, jestem po prostu nie masz pewności, czy istnieje obejście, czy też jest to błąd z istniejącą implementacją? (Lub, mam nadzieję, tylko moje nieporozumienie)

Jakieś sugestie? Dzięki, J

Odpowiedz

10

Ach, nieważne, zaczynamy ... Mam nadzieję, że to pomoże komuś innemu, musisz ustawić serializator w opcjach.

<route> 
      <from uri="file:///tmp/input" /> 
      <setHeader headerName="kafka.PARTITION_KEY"> 
       <constant>Test</constant> 
      </setHeader> 
      <to uri="kafka:localhost:9092?topic=test&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;groupId=group1&amp;serializerClass=kafka.serializer.StringEncoder" /> 
</route> 
0

Znaleziony piękny przykład dla zainstalowaniu i uruchomieniu Apache Kafkę i konfigurowania punktu końcowego na wielbłądzie do wysyłania wiadomości do Kafki topic-

@Override 
    public void configure() throws Exception { 

     String topicName = "topic=javainuse-topic"; 
     String kafkaServer = "kafka:localhost:9092"; 
     String zooKeeperHost = "zookeeperHost=localhost&zookeeperPort=2181"; 
     String serializerClass = "serializerClass=kafka.serializer.StringEncoder"; 

     String toKafka = new StringBuilder().append(kafkaServer).append("?").append(topicName).append("&") 
       .append(zooKeeperHost).append("&").append(serializerClass).toString(); 

     from("file:C:/inbox?noop=true").split().tokenize("\n").to(toKafka); 
    } 

Reference- Apache Camel + Kafka Integration example