Newbie dla RabbitMQ i nowe dla Javy.Jak obsługiwać powiadomienie o anulowaniu konsumenta RabbitMQ podczas korzystania z Spring ChannelAwareMessageListener
Próbuję napisać listener, który będzie używać ręcznych pakietów i obsługiwać powiadomienia o anulowaniu klienta przy użyciu abstrakcji AMQP w Java Spring. Czy mogę wykonać oba zadania za pomocą abstrakcji wiosennej?
Chcę napisać listener, który będzie pobierał wiadomości z kolejki i przetwarzał tę wiadomość (może zapisywać do bazy danych lub coś podobnego). Planowałem korzystać z ręcznych potwierdzeń, więc jeśli przetwarzanie komunikatu nie powiedzie się lub nie można go z jakiegoś powodu zakończyć, mogę je odrzucić i zażądać. Do tej pory myślę, że znalazłem, że aby ręcznie ack/nack/odrzuć przy użyciu Spring AMQP muszę użyć ChannelAwareMessageListener
.
Zdaję sobie sprawę, że powinienem obsługiwać Powiadomienia o Anulowaniu Konsumentów od RabbitMQ, jednak przy użyciu ChannelAwareMessageListener
nie widzę sposobu, aby to zakodować. Jedynym sposobem, w jaki radzę sobie z CCN, jest napisanie kodu przy użyciu interfejsu API klienta na niższym poziomie przez wywołanie channel.basicConsume()
i przekazanie nowej instancji DefaultConsumer
, która umożliwia obsługę dostarczania wiadomości i jej anulowanie.
Również nie widzę, jak ustawić clientProperties
na ConnectionFactory
(aby powiedzieć brokerowi, czy mogę obsłużyć CCN), ponieważ otrzymuję fabrykę z komponentu bean w konfiguracji.
Mój pseudo kod słuchacza i tworzenie kontenera znajduje się poniżej.
public class MyChannelAwareListener implements ChannelAwareMessageListener
{
@Override
public void onMessage(Message message, Channel channel) throws Exception
{
msgProcessed = processMessage(message);
if(msgProcessed)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
else
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
public static void main(String[] args) throws Exception
{
ConnectionFactory rabbitConnectionFactory;
ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext (MY_CONTEXT_PATH);
rabbitConnectionFactory = (ConnectionFactory)ctx.getBean("rabbitConnectionFactory");
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
MyChannelAwareListener listener = new MyChannelAwareListener();
container.setMessageListener(listener);
container.setQueueNames("myQueue");
container.setConnectionFactory(rabbitConnectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.start();
}