Chcę przetwarzać komunikaty z kolejki rabbitMq równolegle. Kolejka jest skonfigurowana jako autoAck = false. Korzystam z obsługi wielbłąd-królikMQ dla camel endpoints, która obsługuje parametr threadPoolSize, ale nie ma pożądanego efektu. Wiadomości są nadal przetwarzane seryjnie poza kolejką, nawet jeśli threadpoolsize = 20.Konsumpcja równoległa Rabbit Mq java klienta
Z debugowania kodu widzę, że parametr threadpoolsize służy do utworzenia ExecutorService, która jest używana do przekazania do królestwa ConnectionFactory zgodnie z opisem here. Wszystko wygląda dobrze, dopóki nie dostaniesz się do królika ConsumerWorkService
. Tutaj wiadomości są przetwarzane w bloku o maksymalnej wielkości 16 wiadomości. Każda wiadomość w bloku jest przetwarzana seryjnie, a następnie, jeśli jest więcej pracy do wykonania, usługa executora wywoływana jest z następnym blokiem. Fragment kodu tego jest poniżej. Z tego użycia usługi executora nie rozumiem, jak można przetwarzać komunikaty równolegle. Wykonawca ma do wykonania tylko jedną pracę do wykonania na raz.
Czego mi brakuje? Dokumentacja
private final class WorkPoolRunnable implements Runnable {
public void run() {
int size = MAX_RUNNABLE_BLOCK_SIZE;
List<Runnable> block = new ArrayList<Runnable>(size);
try {
Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size);
if (key == null) return; // nothing ready to run
try {
for (Runnable runnable : block) {
runnable.run();
}
} finally {
if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) {
ConsumerWorkService.this.executor.execute(new WorkPoolRunnable());
}
}
} catch (RuntimeException e) {
Thread.currentThread().interrupt();
}
}
Czy można skonfigurować ConsumerWorkService, aby używała innego rozmiaru bloku? –
Cześć Claus, Wprowadziłem kilka zmian w komponencie Camel-rabbitmq przez github jako Fergus Nelson. Wprowadziłem zmiany w RabbitMqConsumer, aby skonfigurować kanał dla każdego wymaganego użytkownika. Stworzę prośbę o ściągnięcie Jira +, gdy przetestuję wszystko. –
@ mR_fr0g, jak rozumiem, naprawiłeś problem, tworząc wiele kanałów w komponencie Camel-RabbitMQ. Czy mógłbyś podać link do biletu Jira, polecenie ściągnięcia i określić, w jakiej wersji wersji Camel jest obecna? – wheleph