2013-04-30 24 views
8

Piszę aplikację, która wymaga, aby uruchomić szereg zadań równolegle i następnie pojedyncze zadania z wynikami wszystkich zadań uruchomić:Uruchamianie zadania po wszystkie zadania zostały zakończone

@celery.task 
def power(value, expo): 
    return value ** expo 

@celery.task 
def amass(values): 
    print str(values) 

To bardzo wymyślny i uproszczony przykład, ale miejmy nadzieję, że punkt dobrze się sprawdza. Zasadniczo mam wiele elementów, które trzeba uruchomić przez power, ale chcę tylko uruchomić amass na wyniki wszystkich zadań. Wszystko to powinno się odbywać asynchronicznie i nie potrzebuję niczego z metody amass.

Czy ktoś wie, jak ustawić to w selera, aby wszystko było wykonywane asynchronicznie i pojedynczego wywołania zwrotnego z listą wyników jest wywoływana po wszystkim jest powiedziane i zrobione?

mam ustawić ten przykład, aby uruchomić z chord Alexander Afanasiev zalecane:

from time import sleep 

import random 

tasks = [] 

for i in xrange(10): 
    tasks.append(power.s((i, 2))) 
    sleep(random.randint(10, 1000)/1000.0) # sleep for 10-1000ms 

callback = amass.s() 

r = chord(tasks)(callback) 

Niestety, w powyższym przykładzie, wszystkie zadania w tasks są uruchamiane tylko wtedy, gdy metoda chord nazywa. Czy istnieje sposób, aby każde zadanie można było uruchomić osobno, a następnie mógłbym dodać wywołanie zwrotne do grupy, aby działało, gdy wszystko się skończy?

Odpowiedz

3

Oto rozwiązanie, które działało do moich celów:

tasks.py:

from time import sleep 

import random 

@celery.task 
def power(value, expo): 
    sleep(random.randint(10, 1000)/1000.0) # sleep for 10-1000ms 
    return value ** expo 

@celery.task 
def amass(results, tasks): 
    completed_tasks = [] 
    for task in tasks: 
     if task.ready(): 
      completed_tasks.append(task) 
      results.append(task.get()) 

    # remove completed tasks 
    tasks = list(set(tasks) - set(completed_tasks)) 

    if len(tasks) > 0: 
     # resend the task to execute at least 1 second from now 
     amass.delay(results, tasks, countdown=1) 
    else: 
     # we done 
     print results 

przypadków użycia:

tasks = [] 

for i in xrange(10): 
    tasks.append(power.delay(i, 2)) 

amass.delay([], tasks) 

Co to powinny zrobić to początek wszystkie zadania jak najszybciej asynchronicznie. Gdy wszystkie zostaną wysłane do kolejki, zadanie amass zostanie również wysłane do kolejki. Wykonanie zadania będzie kontynuowane, dopóki wszystkie pozostałe zadania nie zostaną zakończone.

+0

Witam, wygląda na to, że jest to dobre podejście, przynajmniej koncepcyjne. Jednak kiedy wypróbowałem go, dokładnie taki sam kod jak powyżej, zgłasza poniżej błąd: 'EncodeError: nie jest JSON serializowany' Naprawdę doceniam pomoc tutaj . – qre0ct

+0

Ok, rozwiązałem powyższy błąd, przekazując final_task() listę bezpośrednio ID zadania, zamiast przekazać mu listę obiektów zadań, jak to jest zrobione w powyższym przykładzie kodu. Mimo to dzięki za odpowiedź. To bardzo pomogło. – qre0ct

3

Selery ma plenty of tools dla większości workflow, które możesz sobie wyobrazić.

Wygląda na to, że musisz skorzystać z chord. Oto cytat z docs:

A chord is just like a group but with a callback. A chord consists of a header group and a body, where the body is a task that should execute after all of the tasks in the header are complete.

+0

Jest to z pewnością słuszne, ale jest z tym problem. Zaktualizowałem swoją odpowiedź ze szczegółami. –

0

Odpowiedzią że @ aleksander-Afanasiev dał Ci to zasadniczo prawo: użyj akord.

Twój kod jest OK, ale tasks.append(power.s((i, 2))) faktycznie nie wykonuje podzadania, tylko dodaje subtasks do listy. Jest to chord(...)(...) ten, który wysyła tyle komunikatów do brokera, jak podzadania zdefiniowane na liście tasks, oraz jeszcze jeden komunikat dla podzadania wywołania zwrotnego. Gdy zadzwonisz pod numer chord, nastąpi to tak szybko, jak to możliwe.

Jeśli chcesz wiedzieć, kiedy akord skończył, możesz sondować do końca, tak jak w przypadku pojedynczego zadania z użyciem r.ready() w twojej próbce.

+0

Chcę, aby każde podzadanie było wykonywane zaraz po wysłaniu, a nie po opublikowaniu akordu. Czy to jest możliwe? –

+0

Cóż, po prostu wykonaj 'power.delay (i, 2)' w pętli i odpytuj wszystkie pośrednie wyniki dla zakończenia przed wywołaniem 'amass (wyniki)'. Ale tak naprawdę nie widzę sensu. Używanie akordu wykona podzadania 'power.s', gdy tylko będą dostępne jako komunikaty w brokerze i' amass' po ich zakończeniu. Myślę, że powinieneś wyjaśnić, co chcesz osiągnąć, ponieważ wydaje się, że twoje pragnienie wykonania zadań asynchronicznie przeczy używanemu przez ciebie użyciu. – enlavin

+0

Wymyśliłem rozwiązanie powyżej, które pokazuje, co chciałem zrobić. –

0

Biorąc okiem na ten urywek z Twojego pytania, to wygląda na to, że są przechodzącą list jako nagłówek cięciwy, a nie group:

from time import sleep 
import random 

tasks = [] 

for i in xrange(10): 
    tasks.append(power.s((i, 2))) 
    sleep(random.randint(10, 1000)/1000.0) # sleep for 10-1000ms 

callback = amass.s() 

r = chord(tasks)(callback) 

przekształcając list do group powinno skutkować w zachowaniu spodziewasz się:

... 

callback = amass.s() 

tasks = group(tasks) 

r = chord(tasks)(callback) 
Powiązane problemy