2014-04-03 10 views
7
  1. DefaultConsumer
    Moje DemoConsumer dziedziczy DefaultConsumer.
    Zauważyłem, że działanie w ten sposób handleDelivery() jest wywoływane z ThreadPool.
    (drukowanie Thread.currentThread(). GetName() Widzę pool-1-thread-1/2/3/4 eachtime.
    Mam również testowałem go kilka razy i widziałem, że zamówienie zostanie zapisany.
    Wystarczy aby upewnić się, że - ponieważ różne wątki zadzwonić dostawy uchwyt - to będzie bałagan moje zamówienieClient RabbitMQ Java Korzystanie DefaultConsumer vs QueueingConsumer

  2. QueueingConsumer
    Wszystkie tutorial java używać QueueingConsumer konsumować wiadomości
    w docs API jest ona wymieniona jako przestarzałej klasie..
    Czy powinienem zmienić mój kod na odziedziczony po DefaultConsumer? Czy to samouczek jest nieaktualny?

Dzięki.

+0

Co masz na myśli mówiąc "czy to zepsuje mój porządek?" ? Kolejność nanoszenia zamówień lub wiadomości? – Gabriele

Odpowiedz

12

Tak, DefaultConsumer używa wewnętrznej puli wątków, którą można zmienić. Korzystanie ExecutorService jak:

ExecutorService es = Executors.newFixedThreadPool(20); 
Connection conn = factory.newConnection(es); 

Czytaj http://www.rabbitmq.com/api-guide.html „Zaawansowane opcje połączenia”.

Jak można odczytać z „QueueingConsumer” doc:

jako takie, to jest teraz bezpiecznie realizować bezpośrednio konsumentowi lub przedłużyć DefaultConsumer.

Nigdy nie użyłem QueueingConsumer, ponieważ nie jest on prawidłowo sterowany zdarzeniami.

Jak widać tutaj:

QueueingConsumer consumer = new QueueingConsumer(channel); 
channel.basicConsume(QUEUE_NAME, true, consumer); 
while (true) { 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
    /// here you are blocked, waiting the next message. 
    String message = new String(delivery.getBody()); 
} 

Typowym problemem w tym przypadku jest to, jak zamknąć subskrypcji oraz wspólne obejście jest wysłanie oznaczony ścisły post w lokalnym komputerze. Właściwie to nie lubię tego tak bardzo.

Jeśli przedłużyć DefaultConsumer zamiast można prawidłowo zamknąć subskrypcji i kanał:

public class MyConsumer extends DefaultConsumer {...} 

następnie

public static void main(String[] args) { 
MyConsumer consumer = new MyConsumer (channel); 
String consumerTag = channel.basicConsume(Constants.queue, false, consumer); 
System.out.println("press any key to terminate"); 
System.in.read(); 
channel.basicCancel(consumerTag); 
channel.close(); 
.... 

Podsumowując, nie należy się martwić o kolejności wiadomość, bo jeśli wszystko działa poprawnie, kolejność wiadomości jest poprawna, ale myślę, że nie możesz jej założyć, ponieważ jeśli wystąpi jakiś problem, możesz utracić kolejność wiadomości. Jeśli absolutnie potrzebujesz zachować kolejność wiadomości, powinieneś dołączyć sekwencyjny znacznik, aby zrekonstruować kolejność komunikatów po stronie konsumenta.

Powinieneś także rozszerzyć DefaultConsumer.

+0

Dziękuję bardzo –

+0

Hi, jak zrobić rateLimiter przy rozszerzaniu DefaultConsumer, Ponieważ nie mogę kontrolować, czy otrzymać wiadomość, czy nie, gdy rozszerza DefaultConsumer, ale przy użyciu QueueingConsumer, mogę odmówić konsumpcji przed konsumenta.nextDelivery() – chou