2012-04-01 18 views
19

Korzystam z modułu Pythona multiprocessing do równoległego przetwarzania dużych tablic numpy. Tablice są odwzorowywane w pamięci przy użyciu numpy.load(mmap_mode='r') w procesie nadrzędnym. Po tym, multiprocessing.Pool() przedstawia proces (przypuszczam).NumPy vs. wieloprocesorowość i mmap

Wszystko wydaje się działać dobrze, z wyjątkiem jestem coraz linie takie jak:

AttributeError ("obiekt 'NoneType' nie ma atrybutu 'mówią'") w <bound method memmap.__del__ of memmap([ 0.57735026, 0.57735026, 0.57735026, 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ], dtype=float32)> ignorowane

w unittest logs. Testy jednak mijają dobrze.

Każdy pomysł, co się tam dzieje?

Korzystanie z Pythona 2.7.2, OS X, NumPy 1.6.1.


UPDATE:

Po Trcohę, że ścigani przyczynę na ścieżkę kodu, który był przy użyciu (mały kawałek) Ten odwzorowany pamięci numpy tablicy jako dane wejściowe do Pool.imap połączenia.

Najwyraźniej "kwestia" polega na tym, że multiprocessing.Pool.imap przekazuje dane wejściowe do nowych procesów: wykorzystuje pikle. To nie działa z mmap ed numpy tablicami i czymś wewnątrz przerw, które prowadzi do błędu.

Znalazłem this reply Roberta Kerna, który wydaje się adresować ten sam problem. Sugeruje utworzenie specjalnej ścieżki kodu, gdy wejście imap pochodzi z tablicy odwzorowanej w pamięci: mapowanie pamięci w tej samej tablicy ręcznie w zarodkowanym procesie.

To byłoby tak skomplikowane i brzydkie, że wolałbym żyć z błędem i dodatkowymi kopiami pamięci. Czy istnieje inny sposób, który byłby lżejszy przy modyfikowaniu istniejącego kodu?

Odpowiedz

22

Moje zwykłe podejście (jeśli można żyć z dodatkowymi kopiami pamięci) polega na wykonaniu wszystkich operacji we/wy w jednym procesie, a następnie wysłaniu rzeczy do puli wątków roboczych. Aby załadować wycinek macierzy memmapowanej do pamięci, po prostu nie wykonuj tego, co może prowadzić do pewnego zamieszania. x = np.array(data[yourslice]) (data[yourslice].copy())

Po pierwsze, niech wygenerować dane testowe:

import numpy as np 
np.random.random(10000).tofile('data.dat') 

można odtworzyć swoje błędy z czymś takim:

import numpy as np 
import multiprocessing 

def main(): 
    data = np.memmap('data.dat', dtype=np.float, mode='r') 
    pool = multiprocessing.Pool() 
    results = pool.imap(calculation, chunks(data)) 
    results = np.fromiter(results, dtype=np.float) 

def chunks(data, chunksize=100): 
    """Overly-simple chunker...""" 
    intervals = range(0, data.size, chunksize) + [None] 
    for start, stop in zip(intervals[:-1], intervals[1:]): 
     yield data[start:stop] 

def calculation(chunk): 
    """Dummy calculation.""" 
    return chunk.mean() - chunk.std() 

if __name__ == '__main__': 
    main() 

A jeśli po prostu przełączyć się na plonowanie np.array(data[start:stop]) zamiast, będziesz Napraw problem:

import numpy as np 
import multiprocessing 

def main(): 
    data = np.memmap('data.dat', dtype=np.float, mode='r') 
    pool = multiprocessing.Pool() 
    results = pool.imap(calculation, chunks(data)) 
    results = np.fromiter(results, dtype=np.float) 

def chunks(data, chunksize=100): 
    """Overly-simple chunker...""" 
    intervals = range(0, data.size, chunksize) + [None] 
    for start, stop in zip(intervals[:-1], intervals[1:]): 
     yield np.array(data[start:stop]) 

def calculation(chunk): 
    """Dummy calculation.""" 
    return chunk.mean() - chunk.std() 

if __name__ == '__main__': 
    main() 

To oczywiście powoduje dodatkowe kopia w pamięci każdej porcji.

W dłuższej perspektywie najprawdopodobniej łatwiej będzie odejść od niepmapowanych plików i przejść do formatu HDF. Jest to szczególnie prawdziwe, jeśli twoje dane są wielowymiarowe. (Polecam h5py, ale pyTables jest dobre, jeśli twoje dane są "podobne do tabeli".)

Powodzenia, w każdym razie!

+0

Joe twoje odpowiedzi zawsze rock. Właśnie próbowałem wymyślić coś takiego. – YXD

+0

Dzięki za końcówkę HDF. Wygląda na ogromną zmianę, ale może warto, sprawdzę to. – user124114