2014-05-12 23 views
9

Używam kilka cat | zgrep komend na zdalnym serwerze i zbieranie ich wyjście indywidualnie do dalszej obróbki:Python: wykonać koci podproces równolegle

class MainProcessor(mp.Process): 
    def __init__(self, peaks_array): 
     super(MainProcessor, self).__init__() 
     self.peaks_array = peaks_array 

    def run(self): 
     for peak_arr in self.peaks_array: 
      peak_processor = PeakProcessor(peak_arr) 
      peak_processor.start() 

class PeakProcessor(mp.Process): 
    def __init__(self, peak_arr): 
     super(PeakProcessor, self).__init__() 
     self.peak_arr = peak_arr 

    def run(self): 
     command = 'ssh remote_host cat files_to_process | zgrep --mmap "regex" ' 
     log_lines = (subprocess.check_output(command, shell=True)).split('\n') 
     process_data(log_lines) 

To jednak powoduje sekwencyjne wykonanie podproces ("ssh ... cat ... ") komend. Drugi szczyt czeka na pierwszy koniec i tak dalej.

Jak mogę zmodyfikować ten kod, aby wywołania podprocesowe działały równolegle, nadal będąc w stanie zebrać dane wyjściowe dla każdego z osobna?

+0

'--mmap' jest bezużyteczny podczas czytania z potoku ... – twalberg

Odpowiedz

-1

Innym podejściem (zamiast drugiej sugestią postawienia procesów Shell w tle) jest użycie multithreading.

Metoda run że trzeba będzie wtedy zrobić coś takiego:

thread.start_new_thread (myFuncThatDoesZGrep) 

Aby zebrać wyniki, możesz zrobić coś takiego:

class MyThread(threading.Thread): 
    def run(self): 
     self.finished = False 
     # Your code to run the command here. 
     blahBlah() 
     # When finished.... 
     self.finished = True 
     self.results = [] 

Uruchom wątek zgodnie z powyższym opisem w łączu na multithr eading. Kiedy twój obiekt wątku ma wartość myThread.finished == True, możesz zebrać wyniki za pomocą myThread.results.

+0

z tym podejściem, w jaki sposób można uzyskać dane wyjściowe każdego z nich po zakończeniu pracy wątków? I już używam procesu, dlaczego wątek działa, ale nie proces? – liarspocker

+0

Proces będzie działał - inna podana odpowiedź sugeruje, że wykonujesz pracę wieloprocesową w rzeczywistej powłoce, używając &. W tym podejściu masz tylko jeden proces Pythona, ale uruchamia on wiele procesów powłoki. W podejściu wielowątkowym masz wiele procesów Pythona, ale jeden proces powłoki na proces Pythona. Aby zebrać wyniki z wielu wątków, należy utworzyć klasy z podklasą Wątek. Następnie umieść wyniki z jednego wątku jako dane obiektu w tej klasie. – FrobberOfBits

+0

Ale czy to nie to, co robi powyższy kod? Zaczynam nowy proces dla każdego piku, następnie uruchamiam podproces i process_data z jego "metody uruchamiania". – liarspocker

24

Nie trzeba ani multiprocessing ani threading uruchomić podprocesów równolegle np .:

#!/usr/bin/env python 
from subprocess import Popen 

# run commands in parallel 
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True) 
      for i in range(5)] 
# collect statuses 
exitcodes = [p.wait() for p in processes] 

uruchamia komendy 5 shell jednocześnie. Uwaga: w tym miejscu nie są używane nici ani moduł multiprocessing. Nie ma sensu dodawać znaków ampersand i & do poleceń powłoki: Popen nie czeka na zakończenie polecenia. Musisz wyraźnie zadzwonić pod numer .wait().

Jest to wygodne, ale to nie jest konieczne stosowanie nici do zbierania wyjście z podprocesów:

#!/usr/bin/env python 
from multiprocessing.dummy import Pool # thread pool 
from subprocess import Popen, PIPE, STDOUT 

# run commands in parallel 
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True, 
        stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True) 
      for i in range(5)] 

# collect output in parallel 
def get_lines(process): 
    return process.communicate()[0].splitlines() 

outputs = Pool(len(processes)).map(get_lines, processes) 

pokrewne: Python threading multiple bash subprocesses?.

Oto przykład kodu, który pobiera moc od kilku podprocesów jednocześnie w tym samym wątku:

#!/usr/bin/env python3 
import asyncio 
import sys 
from asyncio.subprocess import PIPE, STDOUT 

@asyncio.coroutine 
def get_lines(shell_command): 
    p = yield from asyncio.create_subprocess_shell(shell_command, 
      stdin=PIPE, stdout=PIPE, stderr=STDOUT) 
    return (yield from p.communicate())[0].splitlines() 

if sys.platform.startswith('win'): 
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows 
    asyncio.set_event_loop(loop) 
else: 
    loop = asyncio.get_event_loop() 

# get commands output in parallel 
coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"' 
        .format(i=i, e=sys.executable)) for i in range(5)] 
print(loop.run_until_complete(asyncio.gather(*coros))) 
loop.close() 
+0

@ j-f-sebastian Umm ... Nie jestem pewien, jaka jest różnica między fragmentami kodu # 2 i # 3 w Twojej odpowiedzi. Czy możesz wskazać jakiś zasób lub wyjaśnić, co znaczy "pobiera wynik ... ** w tym samym wątku **"? BTW, bardzo dziękuję za # 2 :) –

+1

@SaheelGodhane: 'oparte na wieloprocesowości.dummy.Pool()' oparte rozwiązanie wykorzystuje * wiele * (kilka/więcej niż jeden) wątków. 'rozwiązanie oparte na asyncio używa tu * pojedynczego * wątku. Aby zrozumieć, jak wykonać kilka rzeczy naraz * w tym samym wątku *, zobacz [Python Współbieżność od podstaw: ŻYJ!] (Http://www.youtube.com/watch?v=MCs5OvhV9S4) – jfs

+0

Doskonały przykład! Próbowałem zaimplementować urywki nr 1 z nową funkcją subprocess.run(), ale wygląda na to, że to nie zadziała, ponieważ ta funkcja zawsze czeka na zakończenie procesu. Musiałem zamiast tego wrócić do używania Popen. – Jared

Powiązane problemy