Chcę utworzyć grupę z listy zwróconej przez zadanie Celery, aby dla każdego elementu w zestawie wyników zadania jedno zadanie zostało dodane do grupy.Jak powiązać zadanie z selerem, które zwraca listę do grupy?
Oto prosty przykład kodu wyjaśniający przypadek użycia. Numer ???
powinien być wynikiem poprzedniego zadania.
@celery.task
def get_list(amount):
# In reality, fetch a list of items from a db
return [i for i in range(amount)]
@celery.task
def process_item(item):
#do stuff
pass
process_list = (get_list.s(10) | group(process_item.s(i) for i in ???))
Jestem prawdopodobnie nie zbliża to poprawnie, ale jestem pewien, że nie jest to bezpieczne, aby zadzwonić do zadań z poziomu zadania:
@celery.task
def process_list():
for i in get_list.delay().get():
process_item.delay(i)
nie muszę wynik z zadania sekund .
Rzeczywiście, nie * nie * wywołuje zadania z zadania. To spowoduje zakleszczenia. Powiedzmy, że masz jednego pracownika. Wywołujesz swoje zadanie, które wiąże pracownika 1, a następnie wywołuje drugie zadanie. Nie ma pracownika, który przetworzyłby to zadanie i wszystko się zawiesi. Ta złośliwość staje się nieco lepsza, gdy dodajesz pracowników, ale zawsze będziesz wiązał wielu pracowników jednym zadaniem (i tracisz równoległość). – mlissner