2013-05-04 11 views
9

Próbuję użyć selera do planowania i uruchamiania zadań na flocie serwerów. Każde zadanie trwa nieco dłużej (kilka godzin) i wymaga użycia podprocesu do wywołania określonego programu z danymi wejściowymi. Ten program wytwarza dużo danych wyjściowych zarówno w standardowym, jak i stderrze.Strumień w selerze

Czy jest jakiś sposób, aby pokazać wyniki produkowane przez program klientowi w czasie zbliżonym do rzeczywistego? Przesyłać strumieniowo dane wyjściowe, aby klient mógł oglądać wyniki wypuszczane przez zadanie uruchomione na serwerze bez logowania się do serwera?

+0

Jakie jest twoje zadanie związane z selerem podczas wykonywania innego programu? Czy używasz 'subprocess.call'? –

+0

Yup tylko podproces.call. –

Odpowiedz

11

Nie określono wielu wymagań i ograniczeń. Zakładam, że już gdzieś masz instancję redis.

Co można zrobić, to przeczytać wyjście z drugiej linii technologicznej po linii i opublikować go przez REDiS:

Oto przykład, gdzie można echo dane do pliku /tmp/foo badania:

import redis 
redis_instance = redis.Redis() 
p = subprocess.Popen(shlex.split("tail -f /tmp/foo"), stdout=subprocess.PIPE) 

while True: 
    line = p.stdout.readline() 
    if line: 
     redis_instance.publish('process log', line) 
    else: 
     break 

W oddzielnym procesie:

import redis 

redis_instance = redis.Redis() 
pubsub = redis_instance.pubsub() 
pubsub.subscribe('process log') 

while True: 
    for message in pubsub.listen(): 
     print message # or use websockets to comunicate with a browser 

Jeśli chcesz proces do końca, można na przykład wyślij "porzuć" po wykonaniu zadania selera.

Możesz użyć różnych kanałów (ciąg w subscribe), aby oddzielić dane wyjściowe od różnych procesów.

Można również zapisać dane wyjściowe dziennika w REDiS, jeśli chcesz,

redis_instance.rpush('process log', message) 

a później odzyskać w całości.

5

Jedynym sposobem, widzę jak to zrobić, to napisać niestandardowy Logger, który będzie używany do stderr i stdout (patrz docs:

from celery.app.log import Logger 
Logger.redirect_stdouts_to_logger(MyLogger()) 

rejestratora można zapisać dane do bazy danych, Memcached, Redis lub cokolwiek wspólne składowanie będziesz używać, aby uzyskać dane

nie jestem pewien o strukturze logger, ale myślę, że coś jak to będzie działać.

from logging import Logger 

class MyLogger(Logger): 
    def log(lvl, msg): 
     # Do something with the message 
+0

@anand_trex czy próbowałeś tego? – AJP

+0

Nie bardzo mi się kojarzę z rejestrowaniem i przesyłaniem wyników. Ponieważ to jest hack, nie zamierzam używać tego rozwiązania. –

2

To stare pytanie, ale nadal jest to jedyny wynik w tym konkretnym temacie.

Oto jak poszedłem o tym, Stworzyłem prostą plikopodobny obiekt, który zamieszcza się do konkretnego kanału nad Redis

class RedisFileObject(object): 
    def __init__(self, _key): 
     self.connection = redis.Redis() 
     self.key = _key 
     self.connection.publish('debug', 'Created channel %s' % self.key) 

    def write(self, data): 
     self.connection.publish(self.key, data) 

    def close(self): 
     pass 

Mam BaseTask z których każdy z moich zadań dziedziczy różne funkcje wew. ten, który zastępuje stdout i stderr obiektem podobnym do pliku Redis.

def capture_output(self): 
    sys.stdout = RedisFileObject(self.request.id) 
    sys.stderr = RedisFileObject(self.request.id) 

Stamtąd wszystko zapisane na stdout/stderr zostanie przekazane do kanału Redis nazwanego po identyfikatorze zadania.