2017-07-13 26 views
9

Używam Spring-Kafka w wersji 1.2.1, a gdy serwer Kafka jest wyłączony/nieosiągalny, asynchroniczne wysyłanie blokuje przez pewien czas. Wydaje się, że jest to limit czasu TCP. Kod jest coś takiego:Spring Kafka asynchroniczne wysyłanie bloków blokowych

ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message); 
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() { 
    @Override 
    public void onSuccess(SendResult<K, V> result) { 
     ... 
    } 

    @Override 
    public void onFailure(Throwable ex) { 
     ... 
    } 
}); 

Zrobiłem naprawdę szybki rzut oka na kod Wiosna-Kafka i wydaje się po prostu przekazać zadanie wraz z biblioteką klienta Kafka, tłumaczenia interakcji oddzwonienia do przyszłości interakcja z obiektem. Patrząc na bibliotekę klienta kafki, kod staje się bardziej złożony i nie potrzebowałem czasu, aby to wszystko zrozumieć, ale przypuszczam, że może on wykonywać wywołania zdalne (przynajmniej metadane?) W tym samym wątku.

Jako użytkownik oczekiwałem metod Spring-Kafka, które zwracają przyszłość, aby natychmiast powrócić, nawet jeśli zdalny serwer kafka jest nieosiągalny.

Każde potwierdzenie, jeśli moje zrozumienie jest błędne lub jeśli jest to błąd, byłoby mile widziane. W końcu skończyłem na asynchronicznym zakończeniu.

Innym problemem jest fakt, że dokumentacja Springa-Kafki mówi na początku, że zapewnia synchroniczne i asynchroniczne metody wysyłania. Nie mogłem znaleźć żadnych metod, które nie zwracają przyszłości, może dokumentacja wymaga aktualizacji.

W razie potrzeby chętnie udzielę dalszych informacji. Dzięki.

Odpowiedz

1

Tylko dla pewności. Czy masz zastosowanie do adnotacji @EnableAsync? Chcę powiedzieć, że to może być klucz do określenia zachowania Future <>

+0

Dziękuję za odpowiedź. Nie Nie używam tej adnotacji, nie było w niej nic w dokumentacji. Spróbuję i dam ci znać, jeśli to rozwiąże problem. –

+0

Korzystanie z @EnableAsync niestety niczego nie zmieniło =/ –

4

Oprócz adnotacji @EnableAsync na klasie konfiguracyjnej, należy użyć adnotacji @sync w metodzie, w której wywołałeś ten kod.

http://www.baeldung.com/spring-async

Oto niektóre fragements kod. Kafka producent config:

@EnableAsync 
@Configuration 
public class KafkaProducerConfig { 

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class); 

    @Value("${kafka.brokers}") 
    private String servers; 

    @Bean 
    public Map<String, Object> producerConfigs() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class); 
     return props; 
    } 

    @Bean 
    public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) { 
     return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper)); 
    } 

    @Bean 
    public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) { 
     return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper)); 
    } 

    @Bean 
    public Producer producer() { 
     return new Producer(); 
    } 
} 

A sam producent:

public class Producer { 

    public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); 

    @Autowired 
    private KafkaTemplate<String, GenericMessage> kafkaTemplate; 

    @Async 
    public void send(String topic, GenericMessage message) { 
     ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message); 
     future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() { 

      @Override 
      public void onSuccess(final SendResult<String, GenericMessage> message) { 
       LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset()); 
      } 

      @Override 
      public void onFailure(final Throwable throwable) { 
       LOGGER.error("unable to send message= " + message, throwable); 
      } 
     }); 
    } 
} 
+0

Dziękuję za odpowiedź. Nie Nie używam tych adnotacji, nie było w nich nic w dokumentacji. Spróbuję obydwu i dam ci znać, jeśli to rozwiąże problem. –

+0

Korzystanie z EnableAsync niestety niczego nie zmieniło. Również z linku rozumiem, że jest to biblioteka wiosenno-kafka, która powinna używać adnotacji asynchronicznej, ponieważ zapewnia mi przyszły obiekt. –

+0

Zgadzam się z tobą, dla mnie to nie ma sensu, że zapewniasz przyszłość, ale i tak muszę umieścić adnotacje. W naszym przypadku umieszczenie tych dwóch adnotacji sprawiło, że działał jak urok. Zmienię odpowiedź dodając fragmenty kodu. –