2013-08-24 12 views
18

Oto program:Użycie pamięci ciągle rośnie z multiprocessing.pool Pythona

#!/usr/bin/python 

import multiprocessing 

def dummy_func(r): 
    pass 

def worker(): 
    pass 

if __name__ == '__main__': 
    pool = multiprocessing.Pool(processes=16) 
    for index in range(0,100000): 
     pool.apply_async(worker, callback=dummy_func) 

    # clean up 
    pool.close() 
    pool.join() 

znalazłem wykorzystanie pamięci (zarówno VIRT i RES) rósł aż do końca()/join(), to istnieje rozwiązanie pozbyć się tego? Próbowałem maxtasksperchild z 2.7, ale to też nie pomogło.

Mam bardziej skomplikowany program, który wywołuje apply_async() ~ 6M razy, a przy ~ 1,5M punkcie Mam już 6G + OZE, aby uniknąć wszystkich innych czynników, uprościłem program do powyższej wersji.

EDIT:

Okazało się ta wersja działa lepiej, dzięki za wkład każdego z nas:

#!/usr/bin/python 

import multiprocessing 

ready_list = [] 
def dummy_func(index): 
    global ready_list 
    ready_list.append(index) 

def worker(index): 
    return index 

if __name__ == '__main__': 
    pool = multiprocessing.Pool(processes=16) 
    result = {} 
    for index in range(0,1000000): 
     result[index] = (pool.apply_async(worker, (index,), callback=dummy_func)) 
     for ready in ready_list: 
      result[ready].wait() 
      del result[ready] 
     ready_list = [] 

    # clean up 
    pool.close() 
    pool.join() 

nie umieszczać żadnych blokadę tam wierzę główny proces jest pojedynczy gwintowany (zwrotna jest mniej więcej tak, jak rzeczy sterowane zdarzeniami na dokumenty, które czytam).

Zmieniłem zakres indeksu v1 na 1 000 000, tak samo jak v2 i zrobiłem kilka testów - dla mnie to dziwne v2 jest nawet o 10% szybsze niż v1 (33s vs 37s), może v1 wykonywał zbyt wiele wewnętrznych zadań konserwacyjnych list. v2 jest zdecydowanie zwycięzcą pod względem wykorzystania pamięci, nigdy nie przekroczyło 300M (VIRT) i 50M (RES), podczas gdy v1 było 370M/120M, najlepsze było 330M/85M. Wszystkie liczby były tylko 3 ~ 4 razy testowanie, tylko odniesienia.

+1

Wystarczy spekulować, ale kolejkowanie miliona obiektów zajmuje miejsce. Być może pomoże im grupowanie. Dokumenty nie są definitywne, ale [przykład] (http://pydoc.net/Python/multiprocessing/2.6.2.1/multiprocessing.examples.mp_pool/) (szukaj Testowania wywołania zwrotnego) pokazuje, że wynik testu jest oczekiwany, nawet gdy są wywołania zwrotne. Oczekiwanie może być konieczne, aby usunąć kolejkę wyników. – tdelaney

+0

Tak więc multiprocessing.pool może nie być odpowiednim narzędziem dla mnie, ponieważ wywołanie zwrotne w rzeczywistości nie wykonuje zadań czyszczenia, czy można wykonać czyszczenie w wywołaniu zwrotnym? Problem polega na tym, że nie mogę czekać po wywołaniu apply_async(), ponieważ w rzeczywistym świecie robot() zajmuje ~ 0,1 sekundy na żądanie (kilka żądań HTTP). –

+1

Wild guess: 'apply_asynch' tworzy instancję [' AsynchResult'] (http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.AsyncResult). "Pula" prawdopodobnie ma pewne odniesienie do tych obiektów, ponieważ muszą one być w stanie zwrócić wynik po zakończeniu obliczeń, ale w swojej pętli po prostu je wyrzucasz. Prawdopodobnie powinieneś nazwać 'get()' lub 'wait()' na wynikach asynch w pewnym momencie, może używając argumentu 'callback'' apply_asynch'. – Bakuriu

Odpowiedz

6

Użyj map_async zamiast apply_async, aby uniknąć nadmiernego zużycia pamięci.

przypadku pierwszego przykładu, zmień następujące dwa wiersze:

for index in range(0,100000): 
    pool.apply_async(worker, callback=dummy_func) 

do

pool.map_async(worker, range(100000), callback=dummy_func) 

Będzie ona zakończyć w mgnieniu zanim będzie można zobaczyć jego zużycie pamięci w top. Zmień listę na większą, aby zobaczyć różnicę. Ale uwaga map_async najpierw konwertuje przechodzącą do iteracji iterowalną listę, aby obliczyć jej długość, jeśli nie ma ona metody __len__. Jeśli masz iterator dużej liczby elementów, możesz użyć itertools.islice, aby przetworzyć je w mniejszych porcjach.

Miałem problem z pamięcią w prawdziwym programie z dużo większą ilością danych i ostatecznie okazało się, że sprawcą był apply_async.

P.S., w odniesieniu do użycia pamięci, twoje dwa przykłady nie mają oczywistej różnicy.

4

Mam bardzo duży zestaw danych chmury punktów 3D, które przetwarzam. Próbowałem użyć modułu do przetwarzania wieloprocesowego, aby przyspieszyć przetwarzanie, ale zacząłem wychodzić z błędów pamięci. Po kilku badaniach i testach ustaliłem, że kolejka zadań do przetworzenia była znacznie szybsza niż podprocesy mogły ją opróżnić. Jestem pewien, że przez chunking, lub używając map_async lub coś, co mogłem dostosować obciążenia, ale nie chciałem wprowadzić istotnych zmian do otaczającej logiki.

Głupie rozwiązanie, na które trafiłem, to okresowe sprawdzanie długości pool._cache, a jeśli pamięć podręczna jest zbyt duża, należy poczekać, aż kolejka się opróżni.

W moim MainLoop miałem już licznik i ticker Status:

# Update status 
count += 1 
if count%10000 == 0: 
    sys.stdout.write('.') 
    if len(pool._cache) > 1e6: 
     print "waiting for cache to clear..." 
     last.wait() # Where last is assigned the latest ApplyResult 

Więc każdy 10k wstawiania do basenu sprawdzić, jeśli istnieje więcej niż 1 milion operacji w kolejce (około 1GB pamięci wykorzystywane w główny proces). Gdy kolejka jest pełna, czekam tylko na zakończenie ostatniego wstawionego zadania.

Teraz mój program może działać przez wiele godzin bez wyczerpania pamięci. Główny proces po prostu się zatrzymuje, podczas gdy pracownicy kontynuują przetwarzanie danych.

BTW _cache członkiem jest udokumentowany przykład moduł basen wieloprocesorowe:

# 
# Check there are no outstanding tasks 
# 

assert not pool._cache, 'cache = %r' % pool._cache 
15

miałem problemów z pamięcią niedawno, ponieważ używałem wiele razy funkcja wieloprocesorowe, więc utrzymanie procesów tarła i pozostawiając je w pamięć.

Oto rozwiązanie używam teraz:

def myParallelProcess(ahugearray) 
from multiprocessing import Pool 
from contextlib import closing 
with closing(Pool(15)) as p: 
    res = p.imap_unordered(simple_matching, ahugearray, 100) 
return res 

I ❤ z

+2

Rozwiązało to mój problem po spędzeniu dni w tej sprawie! Wielkie dzięki! Tworzyłem pulę wewnątrz pętli, więc skończyło się na stworzeniu zbyt wielu procesów, z których każdy zużywał tyle pamięci i nigdy nie wychodził. Po prostu potrzebowałem zrobić mypool.close() na końcu pętli – MohamedEzz

1

myślę, że jest podobny do the question I posted, ale nie jestem pewien, że mają takie samo opóźnienie. Mój problem polegał na tym, że generowałem wyniki z puli wieloprocesowej szybciej, niż je zużywałem, więc gromadziły się w pamięci. Aby tego uniknąć, użyłem semaphore do dławienia wejść do puli, aby nie były zbyt daleko przed wyjściami, które zużyłem.

0

Po prostu utwórz pulę w pętli i zamknij ją na końcu pętli za pomocą pool.close().

Powiązane problemy