2015-07-27 13 views
9

Używam funkcji przetwarzania wieloprocesorowego Pythona do odwzorowania niektórych funkcji na niektóre elementy. Coś w tym stylu:Dlaczego pracownicy wieloprocesorowi Pythona nie umierają?

def computeStuff(arguments, globalData, concurrent=True): 
    pool = multiprocessing.Pool(initializer=initWorker, initargs=(globalData,)) 
    results = pool.map(workerFunction, list(enumerate(arguments))) 
    return results 

def initWorker(globalData): 
    workerFunction.globalData = globalData 

def workerFunction((index, argument)): 
    ... # computation here 

Generalnie uruchamiam testy w ipythonie używając zarówno cPython, jak i Pypy. Zauważyłem, że procesy odradzania często nie są zabijane, więc zaczynają się akumulować, a każdy z nich wykorzystuje gigabajt pamięci RAM. Dzieje się to po naciśnięciu klawisza ctrl-k podczas obliczeń, co powoduje, że przetwarzanie wieloprocesowe staje się wielkim szaleństwem. Ale nawet po zakończeniu obliczeń procesy te nie umrą w Pypy.

Zgodnie z dokumentacją, gdy basen zostanie zebrany śmieci, powinien zadzwonić pod numer terminate() i zabić wszystkie procesy. Co tu się dzieje? Czy muszę jawnie dzwonić pod numer close()? Jeśli tak, czy istnieje jakiś menedżer kontekstów, który właściwie zarządza zamykaniem zasobów (to znaczy procesów)?

Jest to na Mac OS X Yosemite.

+7

Awans na prawdziwy tytuł kapitalisty – percusse

+1

Może po prostu trzeba dodać '' try: ... finally: pool.terminate() ''? –

+0

Może moje pytanie nie jest jasne - mówię, że robotnicy kręcą się nawet po zakończeniu obliczeń. Chociaż nie powinni, jeśli dobrze rozumiem dokumentację, w każdym przypadku. – Ant6n

Odpowiedz

2

Zbiór śmieci PyPy jest leniwy, więc nie zadzwonienie pod numer close oznacza, że ​​Pool jest czyszczone "kiedyś", ale to nie znaczy "w najbliższym czasie".

Po prawidłowym wykonaniu Poolclose d pracownicy kończą pracę, gdy zabraknie im zadań. Łatwym sposobem zapewnienia Pool jest zamknięty w pre-3.3 Python jest:

from contextlib import closing 

def computeStuff(arguments, globalData, concurrent=True): 
    with closing(multiprocessing.Pool(initializer=initWorker, initargs=(globalData,))) as pool: 
     return pool.map(workerFunction, enumerate(arguments)) 

Uwaga: Ja również usunąć wyraźną przemianę list (bezsensowne, ponieważ map będzie iteracyjne enumerate iterator dla Ciebie), a zwracana wyniki bezpośrednio (nie ma potrzeby przypisywania nazwy tylko do powrotu w następnym wierszu).

Jeśli chcesz zapewnić natychmiastowe zakończenie w wyjątku (w Pythonie przed 3.3), użyj bloku try/finally lub napisz prostego menedżera kontekstu (który może być ponownie użyty w innych miejscach, w których używasz Pool):

from contextlib import contextmanager 

@contextmanager 
def terminating(obj): 
    try: 
     yield obj 
    finally: 
     obj.terminate() 

def computeStuff(arguments, globalData, concurrent=True): 
    with terminating(multiprocessing.Pool(initializer=initWorker, initargs=(globalData,))) as pool: 
     return pool.map(workerFunction, enumerate(arguments)) 

podejście terminating jest lepszy, gdyż natychmiast gwarantuje wyjście procesów; teoretycznie, jeśli używasz wątków w innym miejscu w głównym programie, pracownicy Pool mogą być rozwidlani wątkami nie będącymi demonami, które utrzymywałyby procesy przy życiu nawet po zakończeniu wątku zadania roboczego; terminating ukrywa to przez zabijanie procesów na siłę.

Jeśli interpreter jest Python 3.3 lub wyższy, podejście terminating jest wbudowany w Pool, więc nie ma specjalnego owinięcie jest potrzebne do rachunku with, with multiprocessing.Pool(initializer=initWorker, initargs=(globalData,)) as pool: prace bezpośrednio.

+0

bardzo ładne, thx! – Ant6n

Powiązane problemy