2013-03-01 11 views
10

Zgodnie z samouczkiem selerem dotyczącym real-time monitoring of celery workers można również programowo uchwycić zdarzenia wygenerowane przez pracowników i podjąć odpowiednie działania.Jak monitorować zdarzenia od pracowników w aplikacji Celery-Django?

Moje pytanie brzmi: jak mogę zintegrować monitor z przykładem w przykładzie this w aplikacji Celery-Django?

EDIT: Przykładowy kod w samouczku wygląda następująco:

from celery import Celery 

def my_monitor(app): 
    state = app.events.State() 

    def announce_failed_tasks(event): 
     state.event(event) 
     task_id = event['uuid'] 

     print('TASK FAILED: %s[%s] %s' % (
      event['name'], task_id, state[task_id].info(),)) 
    with app.connection() as connection: 
     recv = app.events.Receiver(connection, handlers={ 
       'task-failed': announce_failed_tasks, 
       'worker-heartbeat': announce_dead_workers, 
     }) 
     recv.capture(limit=None, timeout=None, wakeup=True) 

if __name__ == '__main__': 
    celery = Celery(broker='amqp://[email protected]//') 
    my_monitor(celery) 

Więc chcę uchwycić task_failed zdarzenia wysłane przez pracownika, a także uzyskać jego task_id jak poradnik pokazuje, aby uzyskać wynik dla to zadanie z zaplecza, które zostało skonfigurowane dla mojej aplikacji i dalej ją przetwarzać. Moim problemem jest to, że nie jest dla mnie oczywiste, jak uzyskać aplikację, ponieważ w projekcie django-selekcji nie jest dla mnie przejrzyste tworzenie biblioteki Selera.

Jestem również otwarty na wszelkie inne pomysły dotyczące przetwarzania wyników, gdy pracownik zakończył wykonywanie zadania.

+0

myślę, że trzeba być nieco bardziej szczegółowe, jakie zdarzenia muszą przechwytywania? Czy masz przykład kodu? – danodonovan

Odpowiedz

14

Ok, znalazłem sposób na zrobienie tego, choć nie jestem pewien, czy to jest rozwiązanie, ale działa dla mnie. Funkcja monitora w zasadzie łączy się bezpośrednio z brokerem i nasłuchuje różnych typów zdarzeń. Mój kod wygląda następująco:

from celery.events import EventReceiver 
from kombu import Connection as BrokerConnection 

def my_monitor: 
    connection = BrokerConnection('amqp://guest:[email protected]:5672//') 

    def on_event(event): 
     print "EVENT HAPPENED: ", event 

    def on_task_failed(event): 
     exception = event['exception'] 
     print "TASK FAILED!", event, " EXCEPTION: ", exception 

    while True: 
     try: 
      with connection as conn: 
       recv = EventReceiver(conn, 
           handlers={'task-failed' : on_task_failed, 
              'task-succeeded' : on_event, 
              'task-sent' : on_event, 
              'task-received' : on_event, 
              'task-revoked' : on_event, 
              'task-started' : on_event, 
              # OR: '*' : on_event 
              }) 
      recv.capture(limit=None, timeout=None) 
    except (KeyboardInterrupt, SystemExit): 
     print "EXCEPTION KEYBOARD INTERRUPT" 
     sys.exit() 

To wszystko. I uruchamiam to w innym procesie niż normalna aplikacja, co oznacza, że ​​tworzę proces potomny mojej aplikacji do selera, która tylko uruchamia tę funkcję. HTH

+0

Cześć, dziękuję, twoje pytanie jest w zasadzie to, co próbuję teraz zrobić. Gdzie umieszczasz ten kod w swoim projekcie Django? Czy możesz wyjaśnić, jak stworzyć proces potomny swojej aplikacji do selera? W tej chwili moja aplikacja do selera jest konfigurowana w 'myproj/myproj/celery.py' (zgodnie z http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery -with-django) – fpghost

+1

Cześć! Nie pracowałem nad tym od dłuższego czasu, więc wszystko mogło się zmienić w samym Selerum w ciągu ostatnich n wydań. Zasadniczo zacząłem proces demona Python jak: daemon_process = Process (target = result_processing.my_monitor) daemon_process.daemon = True daemon_process.start() w jednym z modułów, który jest wywoływany, gdy aplikacja się uruchamia – Clara

+0

Używałem Django i zainicjował ten monitor przez to. W Django <1.9 byłem w stanie rozpocząć monitorowanie w pliku 'proj/proj/celery.py', po prostu' my_monitor (app) 'po zdefiniowaniu aplikacji selera. Teraz w Django 1.9, który powoduje 'AppRegistryNotReady' exc (myślę, że importowanie modeli z' __init __. Py' aplikacji jest teraz niedozwolone --- powinienem zauważyć, że mój monitor opiera się na niektórych modelach). Zakończyłem uruchamianie monitora w metodzie 'AppConfig.ready()' aplikacji django, której modele opierały się na moim monitorze (dzięki temu aplikacja zakończyła rejestrację). HTH – fpghost

4

Strzeż się kilka pułapek

  1. Musisz ustawić CELERY_SEND_EVENTS flagę jako prawdziwy w swojej selera config.
  2. Możesz także ustawić monitor zdarzeń w nowym wątku od swojego pracownika.

Oto moja realizacja:

class MonitorThread(object): 
    def __init__(self, celery_app, interval=1): 
     self.celery_app = celery_app 
     self.interval = interval 

     self.state = self.celery_app.events.State() 

     self.thread = threading.Thread(target=self.run, args=()) 
     self.thread.daemon = True 
     self.thread.start() 

    def catchall(self, event): 
     if event['type'] != 'worker-heartbeat': 
      self.state.event(event) 

     # logic here 

    def run(self): 
     while True: 
      try: 
       with self.celery_app.connection() as connection: 
        recv = self.celery_app.events.Receiver(connection, handlers={ 
         '*': self.catchall 
        }) 
        recv.capture(limit=None, timeout=None, wakeup=True) 

      except (KeyboardInterrupt, SystemExit): 
       raise 

      except Exception: 
       # unable to capture 
       pass 

      time.sleep(self.interval) 

if __name__ == '__main__': 
    app = get_celery_app() # returns app 
    MonitorThread(app) 
    app.start() 
Powiązane problemy