2016-12-14 11 views
6

Mam niestandardowego obiektu Java i chcą wykorzystać JVM jest w wybudowanym serializacji, aby wysłać go do tematu Kafki, ale nie serializacji z błędem poniżejWyślij do klienta Java Objects Kafka Temat

org.apache.kafka. common.errors.SerializationException: nie można konwertować wartość klasy com.spring.kafka.Payload do klasy określonym org.apache.kafka.common.serialization.ByteArraySerializer w value.serializer

ładunku. java

public class Payload implements Serializable { 

    private static final long serialVersionUID = 123L; 

    private String name="vinod"; 

    private int anInt = 5; 

    private Double aDouble = new Double("5.0"); 

    public String getName() { 
     return name; 
    } 

    public void setName(String name) { 
     this.name = name; 
    } 

    public int getAnInt() { 
     return anInt; 
    } 

    public void setAnInt(int anInt) { 
     this.anInt = anInt; 
    } 

    public Double getaDouble() { 
     return aDouble; 
    } 

    public void setaDouble(Double aDouble) { 
     this.aDouble = aDouble; 
    } 

} 

Podczas tworzenia producenta, mam następujące właściwości ustawione

<entry key="key.serializer" 
         value="org.apache.kafka.common.serialization.ByteArraySerializer" /> 
       <entry key="value.serializer" 
         value="org.apache.kafka.common.serialization.ByteArraySerializer" /> 

Moja wyślij invoke jest poniżej

kafkaProducer.send(new ProducerRecord<String, Payload>("test", new Payload())); 

Co jest poprawny sposób wysyłania niestandardowych obiektów java przez producenta do tematu kafka?

+0

Innym rozwiązaniem jest konwersja do formatu JSON i wysłać – ravthiru

Odpowiedz

8

Mamy 2 opcje wymienione poniżej

1) Jeśli mamy zamiar wysłać niestandardowych obiektów Javy do producenta, musimy stworzyć serializatora który implementuje org.apache.kafka.common.serialization.Serializer i przekazać tę klasę serializer podczas tworzenia swojego producenta

Kod odniesienia poniżej

public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer { 

    public void configure(Map map, boolean b) { 

    } 

    public byte[] serialize(String s, Object o) { 

     try { 
      ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
      ObjectOutputStream oos = new ObjectOutputStream(baos); 
      oos.writeObject(o); 
      oos.close(); 
      byte[] b = baos.toByteArray(); 
      return b; 
     } catch (IOException e) { 
      return new byte[0]; 
     } 
    } 

    public void close() { 

    } 
} 

i ustawić wartość serializatora odpowiednio

<entry key="value.serializer" 
         value="com.spring.kafka.PayloadSerializer" /> 

2) Nie ma potrzeby tworzenia niestandardowej klasy serializera. Użyj istniejącego ByteArraySerializer, ale podczas wysyłania śledzić proces

Java Object -> String (Preferrably JSON represenation zamiast toString) -> tablicaBitowa

3

Ponieważ używasz ByteArraySerializer, musisz utworzyć instancję producenta bajtów [].

Producer<byte[],byte[]> producer = new KafkaProducer<>(props); 

a następnie przy produkcji przekazać byte [] po szeregowania lub jakimś innym sposobem, na przykład,

producer.send(new ProducerRecord<byte[],byte[]>("test", new Payload().toString().getBytes())); 

Jeśli jesteś przejazdem tylko obiektu ładunku do producenta to będzie lepiej mieć serializator i serializator wartości jako coś, co zamierzasz przekazać i podczas czytania musisz odczytać z tych danych.

Zaleca się używanie Serializable i ByteArraySerializer/ByteArrayDeserializer.