2016-10-06 26 views
5

Zgodnie z dostarczoną dokumentacją here, próbuję na POC, aby uzyskać wiadomości do odbiornika, jak wspomniano w same documentation, Poniżej opisano, jak napisałem konfigurację.Wiosenna integracja Kafka Odbiornik konsumentów nie Odbiera wiadomości

@Configuration 
public class KafkaConsumerConfig { 

    public static final String TEST_TOPIC_ID = "record-stream"; 

    @Value("${kafka.topic:" + TEST_TOPIC_ID + "}") 
    private String topic; 

    @Value("${kafka.address:localhost:9092}") 
    private String brokerAddress; 


    /* 
     @Bean public KafkaMessageDrivenChannelAdapter<String, String> adapter(
     KafkaMessageListenerContainer<String, String> container) { 
     KafkaMessageDrivenChannelAdapter<String, String> 
     kafkaMessageDrivenChannelAdapter = new 
     KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record); 
     kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); return 
     kafkaMessageDrivenChannelAdapter; } 

     @Bean public QueueChannel received() { return new QueueChannel(); } 
    */ 

    @Bean 
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { 

     ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     factory.setConcurrency(3); 
     factory.getContainerProperties().setPollTimeout(30000); 
     return factory; 

    } 

    /* 
    * @Bean public KafkaMessageListenerContainer<String, String> container() 
    * throws Exception { ContainerProperties properties = new 
    * ContainerProperties(this.topic); // set more properties return new 
    * KafkaMessageListenerContainer<>(consumerFactory(), properties); } 
    */ 

    @Bean 
    public ConsumerFactory<String, String> consumerFactory() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); 
     // props.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup"); 
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest 
                     // smallest 
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
     props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); 
     props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     return new DefaultKafkaConsumerFactory<>(props); 
    } 

} 

i Listener jest jak poniżej,

@Service 
public class Listener { 

    private Logger log = Logger.getLogger(Listener.class); 


    @KafkaListener(topicPattern = KafkaConsumerConfig.TEST_TOPIC_ID, containerFactory = "kafkaListenerContainerFactory") 
    public void process(String message/* , Acknowledgment ack */) { 
     Gson gson = new Gson(); 
     Record record = gson.fromJson(message, Record.class); 

     log.info(record.getId() + " " + record.getName()); 
     // ack.acknowledge(); 
    } 

} 

chociaż jestem produkujących wiadomości do tego samego tematu i to konsument pracuje na tym samym temacie, słuchacz nie jest wykonywany.

Używam Kafki 0.10.0.1, a tutaj jest mój obecny pom. Ten konsument działa jako aplikacja internetowa wiosennego rozruchu, w przeciwieństwie do wielu próbek linii poleceń.

<dependencies> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter</artifactId> 

     </dependency> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter-web</artifactId> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.integration</groupId> 
      <artifactId>spring-integration-kafka</artifactId> 
      <version>2.1.0.RELEASE</version> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.integration</groupId> 
      <artifactId>spring-integration-java-dsl</artifactId> 

     </dependency> 

     <dependency> 
      <groupId>com.google.code.gson</groupId> 
      <artifactId>gson</artifactId> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter-test</artifactId> 
      <scope>test</scope> 
     </dependency> 

    </dependencies> 

spędziłem sporą ilość czasu, aby dowiedzieć się, dlaczego ten słuchacz nie jest trafiony, gdy wątek ma wiadomości, co to robię źle.

Wiem, że mogę odbierać wiadomości za pomocą kanału (skomentowałem część konfiguracji tego w kodzie), ale tutaj współbieżność jest czysta.

Czy tego rodzaju implementacja jest możliwa z asynchronicznym zużyciem wiadomości.

Odpowiedz

3

Musisz dodać @EnableKafka wraz z @Configuration.

Czy pojawi się add jakiś opis.

Tymczasem:

@Configuration 
@EnableKafka 
public class KafkaConsumerConfig { 
+0

Myślałem EnableIntegration jest dbanie o to, w moim Producer, nie mam EnableKafka. Pozwól, że spróbuję i powrócę do Ciebie. –

+0

'@ EnableIntegration' jest dla Spring Integration, ale mówimy tutaj o Spring Kafka. Są to całkowicie różne niezależne projekty. Look '@ KafkaListener' jest poza Spring Integration Kafka. Podobnie jak pamiętaj, istnieją '@ EnableJms',' @ EnableRabbit' i wiele innych '@Enable ...'. Jedyne '@ EnableIntegration' nie dba o wszystkie z nich. To nie jest odpowiedzialność. –

+0

Wielkie dzięki za wgląd, twoje działa jak czar. Będę o tym pamiętać. –

Powiązane problemy