2014-09-30 15 views
5

mamPracownik selera: jak konsumować ze wszystkich kolejek?

  • ustawić CELERY_CREATE_MISSING_QUEUES = True
  • niezdefiniowane CELERY_QUEUES
  • zdefiniowane CELERY_DEFAULT_QUEUE = 'default' (typu direct)
  • niestandardowej klasy routera, który tworzy trasy w locie, jak pokazano na ten bilet (https://github.com/celery/celery/issues/150) .

widzę, że nowa kolejka na trasie zwracanej przez niestandardowej routera zostanie utworzony które zakładam właśnie z powodu CELERY_CREATE_MISSING_QUEUES.

Teraz w węźle pracownikowi, że biegnę, nie przechodzą -Q argument i zużywa tylko z „default” kolejce, która wydaje się być zgodne z dokumentacją -

domyślnie zostanie zużyty ze wszystkich kolejek zdefiniowanych w ustawieniu CELERY_QUEUES (które, jeśli nie zostanie określone, domyślnie jest kolejką o nazwie selery), oznaczoną jako .

Czy istnieje sposób na pobranie mojego węzła roboczego z wszystkich kolejek, w tym tych, które są tworzone dynamicznie?

Dzięki,

Odpowiedz

3

Pracownik musi być poinformowani o tych automatycznie lub dynamicznie tworzonych kolejek, więc potrzebny jest sposób, aby uzyskać te nazwy kolejki i przechowywać je może podczas ich tworzenia lub otrzymać je może z rabbitmqctl list_queues jeśli ciebie Używa RabbitMQ jako brokera, i na przykład dodaje procedurę obsługi sygnału, aby dodać dynamiczne kolejki do pracowników do konsumpcji.

na przykład stosując celeryd_after_setup sygnału:

from celery.signals import celeryd_after_setup 

@celeryd_after_setup.connect 
def add_dynamic_queue(sender, instance, **kwargs): 
    # get the dynamic queue, maybe stored somewhere 
    queue = 'dynamic_queue' 
    instance.app.amqp.queues.select_add(queue) 

Jeśli zawsze mają nowe kolejki dynamiczne tworzone, można również kierować pracowników do rozpoczęcia spożywania tych kolejek w czasie wykonywania przy użyciu:

#command all workers to consume from the 'dynamic_queue' queue 
app.control.add_consumer('dynamic_queue', reply=True) 

# command specific workers 
app.control.add_consumer('dynamic_queue', reply=True, destination=[[email protected]]) 

See Adding Consumers.

Mam nadzieję, że to pomoże, będę edytować pytanie, gdy otrzymam więcej informacji na ten temat.

+0

Dziękuję. To było interesujące. Tak, używam RabbitMQ i planowałem, że zawsze będę miał dynamiczne kolejki. Miałem nadzieję, że uda mi się powiedzieć pracownikowi, by korzystał ze wszystkich kolejek przy użyciu jakiegoś rodzaju symboli wieloznacznych. Zamierzam teraz wypróbować twoje sugestie. Pytanie - Biorąc pod uwagę, że pracownicy mogą być uruchomieni na różnych serwerach, a jedynym pośrednikiem jest broker komunikatów, w jaki sposób app.control.add_consumer (...) przekazuje informacje pracownikom? – ksrini

+0

@ksrini tak Szukałem kodu źródłowego, aby sprawdzić, czy możemy użyć wzorca wieloznacznego, ale metoda ustawiająca kolejki do konsumpcji wymaga listy kolejek oddzielonych przecinkami. A polecenia sterujące używają brokera (RabbitMQ) do rozsyłania wiadomości do robotników, sprawdź moduł 'celery.app.control'. – Pierre

+0

Twoje sugerowane podejście za pomocą polecenia sterującego add_consumer zadziałało! Dzięki!Chciałbym wywołać add_consumer tylko wtedy, gdy pracownicy nie pobierają już z tej kolejki, ponieważ add_consumer zajmuje trochę czasu (około 1-2 sekund), jeśli reply = True i planowałem zrobić to z routera, gdzie ja określam nowa kolejka jako część trasy. Jeśli ustawię reply = False, jest to szybsze. Czy widzisz jakiś problem z wywołaniem add_consumer, mimo że pracownicy mogli już zużywać się z tej kolejki? – ksrini