2013-06-06 19 views
23

Domyślnie Seler wysłać wszystkie zadania „seler” kolejce, ale można to zmienić dodając dodatkowy parametr:Jak wysłać okresowe zadania do konkretnej kolejki w Seler

@task(queue='celery_periodic') 
def recalc_last_hour(): 
    log.debug('sending new task') 
    recalc_hour.delay(datetime(2013, 1, 1, 2)) # for example 

ustawień harmonogramu:

CELERYBEAT_SCHEDULE = { 
    'installer_recalc_hour': { 
     'task': 'stats.installer.tasks.recalc_last_hour', 
     'schedule': 15 # every 15 sec for test 
    }, 
} 
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler" 
pracownik

Run:

python manage.py celery worker -c 1 -Q celery_periodic -B -E 

Ten schemat nie działa zgodnie z oczekiwaniami: tym pracownikom okresowe wysyła zadania t o kolejka "selera", a nie "selekcja selera". Jak mogę to naprawić?

P.S. seler == 3.0.16

+0

może to być pomocne? http://docs.celeryproject.org/en/latest/userguide/routing.html – oleg

+0

Wyłączyłem opcję CELERYBEAT_SCHEDULER (wykorzystany stoper na podstawie pliku) i działa ona poprawnie. –

Odpowiedz

17

znalazłem rozwiązanie tego problemu:

1) Przede wszystkim zmienił sposób konfigurowania okresowych zadań. Kiedyś @periodic_task dekorator tak:

@periodic_task(run_every=crontab(minute='5'), 
       queue='celery_periodic', 
       options={'queue': 'celery_periodic'}) 
def recalc_last_hour(): 
    dt = datetime.utcnow() 
    prev_hour = datetime(dt.year, dt.month, dt.day, dt.hour) \ 
       - timedelta(hours=1) 
    log.debug('Generating task for hour %s', str(prev_hour)) 
    recalc_hour.delay(prev_hour) 

2) napisałem celery_periodic dwukrotnie params do @periodic_task:

  • kolejki = 'celery_periodic' opcja służy gdy wywołujesz zadanie z kodu (.delay lub .apply_async)

  • opcje = {'kolejka': 'celery_periodic'} Opcja jest używana, gdy wywołuje selera.

Jestem pewien, że to samo jest możliwe, jeśli chcesz skonfigurować okresowe zadania ze zmienną CELERYBEAT_SCHEDULE.

UPD. To rozwiązanie jest poprawne zarówno w przypadku bazy danych, jak i baz danych dla pliku CELERYBEAT_SCHEDULER.

+0

@periodic_task (jeśli zrobię to dobrze) są teraz przestarzałe – StErMi

+0

@StErMi yep, one nie działają. – iankit

25

Okresowe są wysyłane do kolejek przez celerybeat. Możesz zrobić wszystko, co robimy za pomocą aplkera Celery. Oto lista konfiguracji dostarczanych z selerem.

http://celery.readthedocs.org/en/latest/userguide/periodic-tasks.html#available-fields

W twoim przypadku

CELERYBEAT_SCHEDULE = { 
    'installer_recalc_hour': { 
     'task': 'stats.installer.tasks.recalc_last_hour', 
     'schedule': 15 # every 15 sec for test, 
     'options': {'queue' : 'celery_periodic'} ##options are mapped to apply_async options 
    }, 
} 
+7

To powinna być zaakceptowana odpowiedź? – Buttons840

+0

Cóż, oba pytania odpowiadają w pewnym sensie ... Zgadzam się jednak, że ta odpowiedź jest nieco lepsza od zaakceptowanej, ponieważ pytanie autora poprosiło o zmianę CELERYBEAT_SCHEDULE ... – DejanLekic

2

A jeśli używasz djcelery planującego bazy danych, można określić kolejkę na Execution Options -> pola kolejki

Powiązane problemy