2012-11-07 15 views
16

Chcę utworzyć grupę z listy zwróconej przez zadanie Celery, aby dla każdego elementu w zestawie wyników zadania jedno zadanie zostało dodane do grupy.Jak powiązać zadanie z selerem, które zwraca listę do grupy?

Oto prosty przykład kodu wyjaśniający przypadek użycia. Numer ??? powinien być wynikiem poprzedniego zadania.

@celery.task 
def get_list(amount): 
    # In reality, fetch a list of items from a db 
    return [i for i in range(amount)] 

@celery.task 
def process_item(item): 
    #do stuff 
    pass 

process_list = (get_list.s(10) | group(process_item.s(i) for i in ???)) 

Jestem prawdopodobnie nie zbliża to poprawnie, ale jestem pewien, że nie jest to bezpieczne, aby zadzwonić do zadań z poziomu zadania:

@celery.task 
def process_list(): 
    for i in get_list.delay().get(): 
     process_item.delay(i) 

nie muszę wynik z zadania sekund .

+0

Rzeczywiście, nie * nie * wywołuje zadania z zadania. To spowoduje zakleszczenia. Powiedzmy, że masz jednego pracownika. Wywołujesz swoje zadanie, które wiąże pracownika 1, a następnie wywołuje drugie zadanie. Nie ma pracownika, który przetworzyłby to zadanie i wszystko się zawiesi. Ta złośliwość staje się nieco lepsza, gdy dodajesz pracowników, ale zawsze będziesz wiązał wielu pracowników jednym zadaniem (i tracisz równoległość). – mlissner

Odpowiedz

29

Możesz uzyskać tego rodzaju zachowanie za pomocą zadania pośredniego. Oto demonstracja tworzenia "podobnej do mapy" metody, która działa tak, jak zasugerowałeś.

from celery import task, subtask, group 

@task 
def get_list(amount): 
    return [i for i in range(amount)] 

@task 
def process_item(item): 
    # do stuff 
    pass 

@task 
def dmap(it, callback): 
    # Map a callback over an iterator and return as a group 
    callback = subtask(callback) 
    return group(callback.clone([arg,]) for arg in it)() 

# runs process_item for each item in the return of get_list 
process_list = (get_list.s(10) | dmap.s(process_item.s())) 

Chciałbym prosić Solema o udzielenie mi tej sugestii, gdy poprosiłem go o pomoc w podobnej sprawie.

+1

Należy zauważyć, że klon wykonuje płytką kopię. Jeśli chcesz sklonować "złożony" podpis (jak łańcuch, grupa lub akord), musisz albo (ab) użyć deepkopii Pythona, jak wspomniano w [problemie selera 2251] (https://github.com/celery/seler/issues/2251). Lub przeniesiesz 'callback = podzadanie (callback)' do pętli for tworząc funkcje i usuwając 'clone'. –

+0

Przeczytałem powyższy komentarz kilkanaście razy i nie rozumiem. Czy możesz podać przykład @LuisNell? – mlissner

+0

@mlissner Biorąc pod uwagę powyższy kod, mam na myśli następujące. Jeśli założymy, że "oddzwanianie" nie jest po prostu pojedynczym zadaniem, ale raczej złożonym przepływem pracy (grupą lub akordem), nie można po prostu użyć '.klone()'. Grupy i akordy mogą być bardzo złożone (grupa grup itp.). W takim przypadku nie można po prostu użyć '.clone', ponieważ tworzy to tylko płytką kopię twojego podpisu zwrotnego. Oznacza to, że argumenty nie będą poprawnie przekazywane. Aby upewnić się, że wszystko działa zgodnie z oczekiwaniami, musisz użyć 'deepcopy', jak wspomniano w moim oryginalnym komentarzu - czy to czyni go bardziej zrozumiałym? jeśli nie, spróbuję jeszcze raz. –

Powiązane problemy