Ostatnio zauważyłem, że Camel ma teraz swój własny komponent dla Kafki, więc postanowiłem dać mu wir.Integracja Camel Kafka
postanowiłem spróbować piękny prosty plik -> temat Kafka następująco ...
<route>
<from uri="file:///tmp/input" />
<setHeader headerName="kafka.PARTITION_KEY">
<constant>Test</constant>
</setHeader>
<to uri="kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1" />
</route>
To wydaje się dość proste, jednak na prowadzeniu to mam ...
java.lang.ClassCastException: java.lang.String cannot be cast to [B
at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34)
at org.apache.camel.component.kafka.KafkaProducer.process(KafkaProducer.java:78)
a na kontroli kodu Camel, to jednak następujące ...
String msg = exchange.getIn().getBody(String.class);
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, partitionKey.toString(), msg);
producer.send(data);
Oczywiście, jest to problem serializacji, jestem po prostu nie masz pewności, czy istnieje obejście, czy też jest to błąd z istniejącą implementacją? (Lub, mam nadzieję, tylko moje nieporozumienie)
Jakieś sugestie? Dzięki, J