2011-09-05 12 views
11

Mam tablicę Numpy 256x256x256, w której każdy element jest macierzą. Muszę wykonać kilka obliczeń na każdej z tych macierzy i chcę użyć modułu multiprocessing, aby przyspieszyć działanie.Łączenie itertools i multiprocessing?

Wyniki tych obliczeń musi być przechowywana w tablicy 256x256x256 jak pierwotnego, tak że w wyniku osnowy w elemencie [i,j,k] w oryginalnej matrycy musi być umieszczona w elemencie [i,j,k] nowej macierzy.

Aby to zrobić, chcę utworzyć listę, która mogłaby być zapisana w pseudo-ish sposób jako [array[i,j,k], (i, j, k)] i przekazać ją do funkcji, która ma być "wieloprocesowa". Zakładając, że matrices znajduje się lista wszystkich matryc pochodzących z oryginalnej tablicy i myfunc jest funkcja prowadzenia obliczeń, kod będzie wyglądał mniej więcej tak:

import multiprocessing 
import numpy as np 
from itertools import izip 

def myfunc(finput): 
    # Do some calculations... 
    ... 

    # ... and return the result and the index: 
    return (result, finput[1]) 

# Make indices: 
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3) 

# Make function input from the matrices and the indices: 
finput = izip(matrices, inds) 

pool = multiprocessing.Pool() 
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999)) 

Jednak wydaje się, że map_async jest faktycznie tworzenia tego Ogromna finput -lista pierwsza: Mój procesor nie robi wiele, ale pamięć i zamiana zostają całkowicie zużyte w ciągu kilku sekund, co oczywiście nie jest tym, czego chcę.

Czy istnieje sposób na przekazanie tej ogromnej listy do funkcji przetwarzania wieloprocesowego bez potrzeby jej jawnego tworzenia? Czy znasz inny sposób rozwiązania tego problemu?

Wielkie dzięki! :-)

+1

Ponieważ używasz 'get()' na 'map_async()', prawdopodobnie nie potrzebujesz * operacji asynchronicznej * i zamiast tego użyj 'Pool.map()'. –

+0

Może nie rozumiem tego problemu poprawnie, ale czy uważasz, że imap lub imap_unordered? –

Odpowiedz

10

Wszystkie metody pobierają iteratory w pełni (demo code) zaraz po wywołaniu funkcji. Karmić kawałki funkcyjnych Mapa iterator jednym kawałku na raz, użyj grouper_nofill:

def grouper_nofill(n, iterable): 
    '''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']] 
    ''' 
    it=iter(iterable) 
    def take(): 
     while 1: yield list(itertools.islice(it,n)) 
    return iter(take().next,[]) 

chunksize=256 
async_results=[] 
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)): 
    async_results.extend(pool.map_async(myfunc, finput).get()) 
async_results=np.array(async_results) 

PS. Parametr pool.map_async o nazwie "chunksize" robi coś innego: dzieli fragment iteracji na porcje, a następnie przekazuje każdy fragment do procesu roboczego, który wywołuje map(func,chunk). Może to dać pracownikowi więcej danych do przeżycia, jeśli func(item) kończy się zbyt szybko, ale to nie pomaga w twojej sytuacji, ponieważ iterator nadal jest zużywany w pełni zaraz po wywołaniu map_async.

+0

Dziękuję bardzo! Twoje rozwiązanie rzeczywiście działa! Dla odniesienia, musiałem użyć pool.map_async (myfunc, finput) .get (999999), ale działa! Jednak nadal używa dużej ilości pamięci (oczywiście w zależności od dokładnego rozmiaru), a python nie wydaje się być zbędnym zbieraniem podczas działania. Jakieś pomysły, dlaczego tak się dzieje? – digitaldingo

+0

@digitaldingo: Hm, nic nie przychodzi mi do głowy. Byłoby idealnie, gdybyś mógł zredukować swój kod do [SSCCE] (http://sscce.org/) i opublikować go tutaj. – unutbu

0

Pool.map_async() musi znać długość pliku, aby wysłać pracę do wielu pracowników. Ponieważ izip nie ma __len__, konwertuje najpierw iterowalną na listę, powodując ogromne zużycie pamięci.

Możesz spróbować tego uniknąć, tworząc własny iterator stylu izip z __len__.

+0

dlaczego musi to wiedzieć?dlaczego nie może po prostu nakarmić wszystkich bezczynnych pracowników i czekać? –

+0

@andrew - Pierwsze linie w 'map_async()' ('multiprocessing/pool.py') są w rzeczywistości' if not hastr (iterable, '__len__'): iterable = list (iterable) '. Musi znać długość, aby utworzyć wystarczająco dużą listę wyjściową, ponieważ kolejność ułożenia robotników jest nieznana. –

+0

hmmm. może to konstruować tak dynamicznie, czyż nie? Po prostu myślę, że to może być podniesione jako problem. wydaje się być prawidłową prośbą. –

2

Wpadłem również na ten problem. Zamiast tego:

res = p.map(func, combinations(arr, select_n)) 

zrobić

res = p.imap(func, combinations(arr, select_n)) 

IMAP nie zużywa go!