2013-10-08 15 views
5

Chcę przetwarzać komunikaty z kolejki rabbitMq równolegle. Kolejka jest skonfigurowana jako autoAck = false. Korzystam z obsługi wielbłąd-królikMQ dla camel endpoints, która obsługuje parametr threadPoolSize, ale nie ma pożądanego efektu. Wiadomości są nadal przetwarzane seryjnie poza kolejką, nawet jeśli threadpoolsize = 20.Konsumpcja równoległa Rabbit Mq java klienta

Z debugowania kodu widzę, że parametr threadpoolsize służy do utworzenia ExecutorService, która jest używana do przekazania do królestwa ConnectionFactory zgodnie z opisem here. Wszystko wygląda dobrze, dopóki nie dostaniesz się do królika ConsumerWorkService. Tutaj wiadomości są przetwarzane w bloku o maksymalnej wielkości 16 wiadomości. Każda wiadomość w bloku jest przetwarzana seryjnie, a następnie, jeśli jest więcej pracy do wykonania, usługa executora wywoływana jest z następnym blokiem. Fragment kodu tego jest poniżej. Z tego użycia usługi executora nie rozumiem, jak można przetwarzać komunikaty równolegle. Wykonawca ma do wykonania tylko jedną pracę do wykonania na raz.

Czego mi brakuje? Dokumentacja

private final class WorkPoolRunnable implements Runnable { 

     public void run() { 
      int size = MAX_RUNNABLE_BLOCK_SIZE; 
      List<Runnable> block = new ArrayList<Runnable>(size); 
      try { 
       Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size); 
       if (key == null) return; // nothing ready to run 
       try { 
        for (Runnable runnable : block) { 
         runnable.run(); 
        } 
       } finally { 
        if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) { 
         ConsumerWorkService.this.executor.execute(new WorkPoolRunnable()); 
        } 
       } 
      } catch (RuntimeException e) { 
       Thread.currentThread().interrupt(); 
      } 
     } 
+0

Czy można skonfigurować ConsumerWorkService, aby używała innego rozmiaru bloku? –

+1

Cześć Claus, Wprowadziłem kilka zmian w komponencie Camel-rabbitmq przez github jako Fergus Nelson. Wprowadziłem zmiany w RabbitMqConsumer, aby skonfigurować kanał dla każdego wymaganego użytkownika. Stworzę prośbę o ściągnięcie Jira +, gdy przetestuję wszystko. –

+0

@ mR_fr0g, jak rozumiem, naprawiłeś problem, tworząc wiele kanałów w komponencie Camel-RabbitMQ. Czy mógłbyś podać link do biletu Jira, polecenie ściągnięcia i określić, w jakiej wersji wersji Camel jest obecna? – wheleph

Odpowiedz

3

RabbitMQ nie jest bardzo jasne, o tym jednak, mimo że ConsumerWorkService korzysta z puli wątków, to basen nie wydaje się być wykorzystywane w sposób przetwarzać wiadomości równolegle:

Każdy kanał ma własny wątek wysyłki. W przypadku najczęstszych przypadków użycia jednego konsumenta na kanał oznacza to, że konsumenci nie zatrzymują innych konsumentów. Jeśli masz wielu Konsumentów na kanał, pamiętaj, że długoletni Konsument może wstrzymać wysyłkę zwrotną do innych Konsumentów na tym kanale.

(http://www.rabbitmq.com/api-guide.html)

Dokumentacja ta sugeruje użycie jednego Channel na wątek i, w rzeczywistości, jeśli po prostu utworzyć wiele Channel S jako wymaganego poziomu współbieżności, komunikaty będą wysyłane między konsumentów związanych z te kanały.

Testowałem z dwoma kanałami i konsumentami: gdy 2 wiadomości są w kolejce, każdy konsument wybiera tylko jedną wiadomość na raz. Bloki 16 wiadomości, o których wspomniałeś, nie przeszkadzają, co jest dobre.

W rzeczywistości Spring AMQP tworzy również kilka kanałów do jednoczesnego przetwarzania komunikatów. Odbywa się to poprzez:

Ja również przetestowane to, że działa zgodnie z oczekiwaniami.

3

Jeśli masz pojedynczą instancję Channel, będzie ona wywoływać zarejestrowanych klientów seryjnie, jak prawidłowo wykryto przez sprawdzenie ConsumerWorkService. Są dwa sposoby na pokonanie tego:

  1. Użyj wielu kanałów zamiast jednego.
  2. Użyj pojedynczego kanału, ale zastosuj konsumentów w specjalny sposób. Powinni po prostu wybrać przychodzącą wiadomość z kolejki i umieścić ją jako zadanie w wewnętrznej puli wątków.

Więcej szczegółów można znaleźć pod numerem this post.