2015-06-11 15 views
9

Próbuję utworzyć prostą aplikację startową wiosnę z buta sprężyny, które „produkują” wiadomości do RabbitMQ wymiany/kolejki i innymi próbki bagażnika sprężyna app że „konsumować” te wiadomości. Więc mam dwie aplikacje (lub microservices jeśli chcesz). 1) "producent" mikroserwis 2) "konsument" mikroserwiswiosna bagażnika RabbitMQ MappingJackson2MessageConverter niestandardowy obiekt konwersja

"Producent" ma 2 obiekty domeny. Foo i Bar, które należy przekonwertować na json i wysłać do rabbitmq. „Konsument” należy odbierać i konwertować wiadomości json do domeny i Foo Bar odpowiednio. Z jakiegoś powodu nie mogę wykonać tego prostego zadania. Nie ma zbyt wielu przykładów na ten temat. dla wiadomości konwerter chcę używać org.springframework.messaging.converter.MappingJackson2MessageConverter

Oto co mam do tej pory:

PRODUCENT MICROSERVICE

package demo.producer; 

import org.springframework.amqp.core.Binding; 
import org.springframework.amqp.core.BindingBuilder; 
import org.springframework.amqp.core.Queue; 
import org.springframework.amqp.core.TopicExchange; 
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.boot.CommandLineRunner; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.context.annotation.Bean; 
import org.springframework.messaging.converter.MappingJackson2MessageConverter; 
import org.springframework.stereotype.Service; 

@SpringBootApplication 
public class ProducerApplication implements CommandLineRunner { 

    public static void main(String[] args) { 
     SpringApplication.run(ProducerApplication.class, args); 
    } 

    @Bean 
    Queue queue() { 
     return new Queue("queue", false); 
    } 

    @Bean 
    TopicExchange exchange() { 
     return new TopicExchange("exchange"); 
    } 

    @Bean 
    Binding binding(Queue queue, TopicExchange exchange) { 
     return BindingBuilder.bind(queue).to(exchange).with("queue"); 
    } 

    @Bean 
    public MappingJackson2MessageConverter jackson2Converter() { 
     MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); 
     return converter; 
    } 

    @Autowired 
    private Sender sender; 

    @Override 
    public void run(String... args) throws Exception { 
     sender.sendToRabbitmq(new Foo(), new Bar()); 
    } 
} 

@Service 
class Sender { 

    @Autowired 
    private RabbitMessagingTemplate rabbitMessagingTemplate; 
    @Autowired 
    private MappingJackson2MessageConverter mappingJackson2MessageConverter; 

    public void sendToRabbitmq(final Foo foo, final Bar bar) { 

     this.rabbitMessagingTemplate.setMessageConverter(this.mappingJackson2MessageConverter); 

     this.rabbitMessagingTemplate.convertAndSend("exchange", "queue", foo); 
     this.rabbitMessagingTemplate.convertAndSend("exchange", "queue", bar); 

    } 
} 

class Bar { 
    public int age = 33; 
} 

class Foo { 
    public String name = "gustavo"; 
} 

MICROSERVICE KONSUMENTA

package demo.consumer; 

import org.springframework.amqp.rabbit.annotation.EnableRabbit; 
import org.springframework.amqp.rabbit.annotation.RabbitListener; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.boot.CommandLineRunner; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.stereotype.Service; 

@SpringBootApplication 
@EnableRabbit 
public class ConsumerApplication implements CommandLineRunner { 

    public static void main(String[] args) { 
     SpringApplication.run(ConsumerApplication.class, args); 
    } 

    @Autowired 
    private Receiver receiver; 

    @Override 
    public void run(String... args) throws Exception { 

    } 

} 

@Service 
class Receiver { 
    @RabbitListener(queues = "queue") 
    public void receiveMessage(Foo foo) { 
     System.out.println("Received <" + foo.name + ">"); 
    } 

    @RabbitListener(queues = "queue") 
    public void receiveMessage(Bar bar) { 
     System.out.println("Received <" + bar.age + ">"); 
    } 
} 

class Foo { 
    public String name; 
} 

class Bar { 
    public int age; 
} 

I tu jest wyjątek dostaję:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message 
Endpoint handler details: 
Method [public void demo.consumer.Receiver.receiveMessage(demo.consumer.Bar)] 
Bean [[email protected]] 
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:116) 
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:93) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:83) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:170) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1257) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1021) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1005) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:83) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1119) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message 
    ... 13 common frames omitted 
Caused by: org.springframework.messaging.converter.MessageConversionException: No converter found to convert to class demo.consumer.Bar, message=GenericMessage [payload=byte[10], headers={amqp_receivedRoutingKey=queue, amqp_receivedExchange=exchange, amqp_deliveryTag=1, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=queue, amqp_redelivered=false, id=87cf7e06-a78a-ddc1-71f5-c55066b46b11, amqp_consumerTag=amq.ctag-msWSwB4bYGWVO2diWSAHlw, contentType=application/json;charset=UTF-8, timestamp=1433989934574}] 
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:115) 
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:77) 
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:127) 
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:100) 
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:113) 
    ... 12 common frames omitted 

Wyjątkiem mówi nie ma konwerter, i to prawda, mój problem jest to, że nie mam pojęcia jak ustawić MappingJackson2MessageConverter konwerter po stronie konsumentów (proszę pamiętać, że chcę użyć org.springframework .messaging.converter.MappingJackson2MessageConverter i nie org.springframework.amqp.support.converter.JsonMessageConverter)

Wszelkie myśli?

Na wszelki wypadek można wybulić ten przykładowy projekt w: https://github.com/gustavoorsi/rabbitmq-consumer-receiver

+0

Spójrz tutaj: http://stackoverflow.com/questions/29337550/using-rabbitlistener-with-jackson2jsonmessageconverter – stalet

+0

W tym przykładzie używa ** org.springframework.amqp.support.converter.Jackson2JsonMessageConverter ** (co należy do dependency spring-amqp), w moim przypadku chcę użyć ** org.springframework.messaging.converter.MappingJackson2MessageConverter ** (który należy do wiadomości wiosennych). – Gustavo

Odpowiedz

12

Ok, ja wreszcie dostałem tej pracy.

Wiosna wykorzystuje PayloadArgumentResolver wyodrębnić, konwersji i ustawić przekształca komunikat parametru sposobu z uwagami @RabbitListener. Jakoś musimy ustawić mappingJackson2MessageConverter do tego obiektu.

Tak więc w aplikacji KONSUMENTA musimy wdrożyć RabbitListenerConfigurer.Nadrzędnymi configureRabbitListeners (RabbitListenerEndpointRegistrar rejestratora) możemy ustawić niestandardowy DefaultMessageHandlerMethodFactory, do tej fabryki ustawiamy konwerter wiadomość, a fabryka stworzy naszą PayloadArgumentResolver z właściwego przeliczenia.

Oto fragment kodu, zaktualizowałem również git project.

ConsumerApplication.java

package demo.consumer; 

import org.springframework.amqp.rabbit.annotation.EnableRabbit; 
import org.springframework.amqp.rabbit.annotation.RabbitListener; 
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; 
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.context.annotation.Bean; 
import org.springframework.messaging.converter.MappingJackson2MessageConverter; 
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; 
import org.springframework.stereotype.Service; 

@SpringBootApplication 
@EnableRabbit 
public class ConsumerApplication implements RabbitListenerConfigurer { 

    public static void main(String[] args) { 
     SpringApplication.run(ConsumerApplication.class, args); 
    } 

    @Bean 
    public MappingJackson2MessageConverter jackson2Converter() { 
     MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); 
     return converter; 
    } 

    @Bean 
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { 
     DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); 
     factory.setMessageConverter(jackson2Converter()); 
     return factory; 
    } 

    @Override 
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { 
     registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory()); 
    } 

    @Autowired 
    private Receiver receiver; 

} 

@Service 
class Receiver { 
    @RabbitListener(queues = "queue") 
    public void receiveMessage(Foo foo) { 
     System.out.println("Received <" + foo.name + ">"); 
    } 

    @RabbitListener(queues = "queue") 
    public void receiveMessage(Bar bar) { 
     System.out.println("Received <" + bar.age + ">"); 
    } 
} 

class Foo { 
    public String name; 
} 

class Bar { 
    public int age; 
} 

Tak więc, jeśli uruchomić microservice Producent doda 2 wiadomości w kolejce. Jeden reprezentujący obiekt Foo i inny reprezentujący obiekt Bar. Po uruchomieniu mikroserwisu konsumenta zobaczysz, że oba są zużywane przez odpowiednią metodę w klasie Odbiornik.


Updated problem:

Jest problem konceptualny o kolejkach z mojej strony myślę. To, co chciałem osiągnąć, nie może być możliwe, deklarując 2 metody opatrzone komentarzem: @RabbitListener, które wskazują na tę samą kolejkę. Powyższe rozwiązanie nie działa poprawnie. Jeśli wyślesz do rabbitmq, powiedzmy, 6 wiadomości Foo i 3 wiadomości Bar, nie będą odbierane 6 razy przez słuchacza z parametrem Foo. Wydaje się, że detektor jest wywoływany równolegle, więc nie ma możliwości rozróżnienia, który słuchacz powinien wywołać na podstawie typu argumentu method. Moje rozwiązanie (i nie jestem pewien, czy to jest najlepszy sposób, jestem otwarty na sugestie tutaj) jest utworzenie kolejki dla każdej jednostki. Więc teraz mam queue.bar i queue.foo i zaktualizuj @RabbitListener (kolejki = „queue.foo”) Po raz kolejny został uaktualniony kod i można to sprawdzić w mój git repository.

+1

1.5.0 (obecnie w punkcie kontrolnym 1) obsługuje [class-level '@ RabbitListener'] (https://spring.io/blog/2015/05/08/spring-amqp-1-4-5-release-and -1-5-0-m1-dostępny) wraz z '@ RabbitHandler' na poszczególnych metodach wspierających ten przypadek użycia. –

+0

to miło wiedzieć. dzięki ! – Gustavo

0

nie zostało to zrobione siebie, ale wydaje się, że trzeba się zarejestrować odpowiedni konwersji poprzez utworzenie RabbitTemplate. Zajrzyj do sekcji 3.6.2 w this Spring documentation. Wiem, że jest skonfigurowany przy użyciu klas AMQP, ale jeśli klasa komunikatów, o której wspomniałeś, jest zgodna, nie ma powodu, dla którego nie możesz go zastąpić. Wygląda na to, że this reference wyjaśnia, jak możesz to zrobić, używając konfiguracji Java zamiast XML. Tak naprawdę nie używałem Królika, więc nie mam żadnych osobistych doświadczeń, ale chciałbym usłyszeć, co się dowiaduje.

Powiązane problemy