2016-12-13 14 views
5

Używam Celery 4.0.1 z Django 1.10 i mam zadania planowania awarii (uruchomienie zadania działa poprawnie). Oto konfiguracja seler:Ustawianie zadań okresowych w Celery (celerybeat) dynamicznie za pomocą add_periodic_task

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings') 
app = Celery('myapp') 

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) 

app.conf.BROKER_URL = 'amqp://{}:{}@{}'.format(settings.AMQP_USER, settings.AMQP_PASSWORD, settings.AMQP_HOST) 
app.conf.CELERY_DEFAULT_EXCHANGE = 'myapp.celery' 
app.conf.CELERY_DEFAULT_QUEUE = 'myapp.celery_default' 
app.conf.CELERY_TASK_SERIALIZER = 'json' 
app.conf.CELERY_ACCEPT_CONTENT = ['json'] 
app.conf.CELERY_IGNORE_RESULT = True 
app.conf.CELERY_DISABLE_RATE_LIMITS = True 
app.conf.BROKER_POOL_LIMIT = 2 

app.conf.CELERY_QUEUES = (
    Queue('myapp.celery_default'), 
    Queue('myapp.queue1'), 
    Queue('myapp.queue2'), 
    Queue('myapp.queue3'), 
) 

Następnie w tasks.py mam:

@app.task(queue='myapp.queue1') 
def my_task(some_id): 
    print("Doing something with", some_id) 

W views.py Chcę zaplanować to zadanie:

def my_view(request, id): 
    app.add_periodic_task(10, my_task.s(id)) 

Potem wykonać polecenia:

sudo systemctl start rabbitmq.service 
celery -A myapp.celery_app beat -l debug 
celery worker -A myapp.celery_app 

Ale zadanie jest nigdy nie zaplanowane. Nie widzę niczego w dziennikach. Zadanie działa, ponieważ jeśli w mojej opinii mam:

def my_view(request, id): 
    my_task.delay(id) 

Zadanie jest wykonywane.

Jeśli w moim pliku konfiguracyjnego jeśli zaplanować zadanie ręcznie, jak to działa:

app.conf.CELERYBEAT_SCHEDULE = { 
    'add-every-30-seconds': { 
     'task': 'tasks.my_task', 
     'schedule': 10.0, 
     'args': (66,) 
    }, 
} 

Po prostu nie mogę zaplanować zadanie dynamicznie. Dowolny pomysł?

Odpowiedz

14

Właściwie nie można określić okresowe zadanie nie na poziomie widoku, ponieważ ustawienie harmonogramu bicie zostanie załadowany pierwszy i nie mogą być przesunięte w czasie wykonywania:

Funkcja add_periodic_task() będzie dodać wpis do beat_schedule ustawienie za kulisami, a tym samym ustawienie może również mogą być wykorzystane do utworzenia okresowych zadań ręcznie:

app.conf.CELERYBEAT_SCHEDULE = { 
    'add-every-30-seconds': { 
     'task': 'tasks.my_task', 
     'schedule': 10.0, 
     'args': (66,) 
    }, 
} 

co oznacza, jeśli chcesz używać add_periodic_task() powinno być opakowane withi n procedura obsługi on_after_configure na poziomie seler aplikację i wszelkie zmiany na starcie nie wejdą w życie:

app = Celery() 

@app.on_after_configure.connect 
def setup_periodic_tasks(sender, **kwargs): 
    sender.add_periodic_task(10, my_task.s(66)) 

Jak wspomniano w doc The regularne celerybeat prostu śledzić wykonanie zadania:

domyślnej scheduler to celery.beat.PersistentScheduler, który po prostu śledzi czasy ostatniego uruchomienia w lokalnym pliku bazy danych półki.

Aby móc dynamicznie zarządzać okresowych zadań i przełożyć celerybeat przy starcie:

Istnieje również rozszerzenie django-celery-beat która przechowuje harmonogram w bazie danych Django i prezentuje wygodny interfejs administratora do zarządzać okresowymi zadaniami w środowisku wykonawczym.

Zadania zostaną utrwalone w bazie danych django, a harmonogram może zostać zaktualizowany w modelu zadania na poziomie db.Za każdym razem, gdy aktualizujesz zadanie okresowe, licznik w tej tabeli zadań będzie zwiększany i będzie wskazywał serwisowi beaser selerowi, aby ponownie załadował harmonogram z bazy danych.

Możliwym rozwiązaniem dla Ciebie może być następująca:

from django_celery_beat.models import PeriodicTask, IntervalSchedule 

schedule= IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS) 
task = PeriodicTask.objects.create(interval=schedule, name='any name', task='tasks.my_task', args=json.dumps([66])) 

views.py

def update_task_view(request, id) 
    task = PeriodicTask.objects.get(name="task name") # if we suppose names are unique 
    task.args=json.dumps([id]) 
    task.save() 


EDIT: (13/01/2018)


The najnowszej release 4.1.0 zajęły przedmiot w tej ticket #3958 i została połączona

+0

Picking up na swoim komentarzu „Właściwie nie można określić okresowe zadanie nie na poziomie widoku”: Czy jest możliwe użyć 'add_periodic_task()' na poziomie aplikacji, np. w 'task.py'? Wydaje się, że lepiej hermetyzować, aby zachować te okresowe zadania zadeklarowane w aplikacji. –

+0

Właściwie nie jest konieczne korzystanie z niego, ponieważ zostanie on wywołany, jeśli użyjesz tylko składni ustawień "app.conf.CELLERYBEAT_SCHEDULE", ale jeśli chcesz go użyć, możesz go użyć w 'task.py' plik. – DhiaTN

+2

Wierzę, że najnowsza wersja (po wersji 4.1.0) powinna mieć adresowaną. Oto dev, który się dzieje [# 3958] (https://github.com/celery/celery/pull/3958) –

Powiązane problemy