2012-09-21 12 views
7

Używam celery w mojej aplikacji do uruchamiania zadań okresowych. Zobaczmy prosty przykład poniżejOddzwonienie do selera apply_async

from myqueue import Queue 
@perodic_task(run_every=timedelta(minutes=1)) 
def process_queue(): 
    queue = Queue() 
    uid, questions = queue.pop() 
    if uid is None: 
     return 

    job = group(do_stuff(q) for q in questions) 
    job.apply_async() 

def do_stuff(question): 
    try: 
     ... 
    except: 
     ... 
     raise 

Jak widać w powyższym przykładzie, używam celery uruchomić zadanie asynchronicznej, ale (ponieważ jest to kolejka) muszę zrobić queue.fail(uid) w przypadku wyjątku w do_stuff lub queue.ack(uid) inaczej . W tej sytuacji bardzo jasne i przydatne byłoby wywoływanie zwrotne z mojego zadania w obu przypadkach - on_failure i on_success.

Widziałem niektóre documentation, ale nigdy nie widziałem praktyk korzystania z callbacków z apply_async. czy jest to możliwe do zrobienia?

Odpowiedz

26

podklasa klasy Task i przeciążać funkcje on_success i on_failure:

class CallbackTask(Task): 
    def on_success(self, retval, task_id, args, kwargs): 
     pass 

    def on_failure(self, exc, task_id, args, kwargs, einfo): 
     pass 


@celery.task(base=CallbackTask) # this does the trick 
def add(x, y): 
    return x + y