2012-06-24 13 views
19

Używam multiprocessing.imap_unordered do wykonywania obliczeń na listę wartości:wieloprocesorowe Pythona i pamięć

def process_parallel(fnc, some_list): 
    pool = multiprocessing.Pool() 
    for result in pool.imap_unordered(fnc, some_list): 
     for x in result: 
      yield x 
    pool.terminate() 

Każde wywołanie fnc zwraca ogromny obiekt w wyniku, przez projekt. Mogę przechowywać N wystąpień takiego obiektu w pamięci RAM, gdzie N ~ cpu_count, ale niewiele więcej (nie setki).

Teraz użycie tej funkcji zajmuje zbyt dużo pamięci. Pamięć jest całkowicie wydatkowana w głównym procesie, a nie w pracownikach.

W jaki sposób imap_unordered przechowuje wyniki końcowe? Mam na myśli wyniki, które zostały już zwrócone przez pracowników, ale jeszcze nie zostały przekazane użytkownikowi. Myślałem, że to mądre i obliczało je "leniwie" w razie potrzeby, ale najwyraźniej nie.

Wygląda na to, że nie mogę wystarczająco szybko zużyć wyników z process_parallel, pula utrzymuje w kolejce te ogromne obiekty od fnc gdzieś, wewnętrznie, a następnie wysadza w powietrze. Czy istnieje sposób, aby tego uniknąć? Jakoś ograniczyć wewnętrzną kolejkę?


Używam Python2.7. Twoje zdrowie.

+0

Cóż z tego, co widzę "wydajność" jest w głównym procesie, nie wewnątrz 'fnc' (tj. Funkcja wykonywana przez pracowników). Czy 'fnc' samo robi leniwą ocenę? – Felix

+0

@FelixBonkoski Nie, 'fnc' pobiera pojedynczy element z' some_list' i oblicza i zwraca z niego ogromny obiekt. – user124114

+0

Ograniczenie tylko w oparciu o dostępną pamięć. –

Odpowiedz

10

Jak widać, patrząc na odpowiedni plik źródłowy (python2.7/multiprocessing/pool.py), IMapUnorderedIterator używa instancji collections.deque do przechowywania wyników. Jeśli pojawi się nowy element, zostanie on dodany i usunięty w iteracji.

Jak zasugerowałeś, jeśli pojawi się kolejny duży obiekt, podczas gdy główny wątek nadal przetwarza obiekt, te będą również przechowywane w pamięci.

Co możesz spróbować coś takiego:

it = pool.imap_unordered(fnc, some_list) 
for result in it: 
    it._cond.acquire() 
    for x in result: 
     yield x 
    it._cond.release() 

Powinno to spowodować, że zadanie-wynik-odbiornik wątku zablokowane podczas przetwarzania elementu, jeśli próbuje się postawić kolejny obiekt w deque. Tak więc nie powinno być więcej niż dwa ogromne obiekty w pamięci. Jeśli to działa w twoim przypadku, nie wiem;)

+0

Nie podążam za tym przykładem, czy nie jest "po prostu" generatorem i jako taki nie będzie miał metod "_cond.acquire()" i "release"? Jeśli musisz napisać je samemu, jakiego rodzaju obiektem musi być '._cond'? – Hooked

+0

Wygląda na to, że użytkownik troszczy się o wydajność, dlaczego ograniczyć go do małej liczby za pomocą prostej blokady? –

+0

@Hooked: 'imap_unordered' zwraca' IMapUnorderedIterator', który ma te funkcje, które można zobaczyć, przeglądając odpowiedni kod źródłowy. Ponieważ nić wynik-odbiorca (po otrzymaniu wyniku) wymaga, aby blokada wprowadziła wynik do kresek, zablokuje to wątek i powstrzyma go przed zużywaniem większej ilości pamięci. – rumpel

2

Najprostszym rozwiązaniem, które mogę wymyślić, jest dodanie zamknięcia do zawijania funkcji fnc, która używa semafora do kontrolowania całkowitej liczby równoczesnych zadań wykonania, które można wykonać jednocześnie (zakładam, że główny proces/wątek zwiększy semafor). Wartość semafora może być obliczona na podstawie wielkości zadania i dostępnej pamięci.

Powiązane problemy