2011-08-10 26 views
26

Mam problem z używaniem Pool.map_async() (i także Pool.map()) w module przetwarzania wieloprocesowego. Zaimplementowałem funkcję równoległą do pętli, która działa dobrze, dopóki wejście funkcji do Pool.map_async jest funkcją "regularną". Gdy funkcją jest np. metody do klasy, a potem dostaję PicklingError:Błąd PicklingError podczas korzystania z przetwarzania wieloprocesowego

cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

używam Python tylko do obliczeń naukowych, więc nie jestem tak obeznany z pojęciem peklowania, właśnie dowiedziałem się trochę o tym dzisiaj. Przyjrzałem się kilku wcześniejszym odpowiedziom, takim jak Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map(), ale nie wiem, jak to zrobić, nawet jeśli podążam za linkiem podanym w odpowiedzi.

Mój kod, miały na celu symulację wektora Normal r.v za pomocą wielu rdzeni. Zauważ, że jest to tylko przykład i może nawet nie opłaca się działać na wielu rdzeniach.

import multiprocessing as mp 
import scipy as sp 
import scipy.stats as spstat 

def parfor(func, args, static_arg = None, nWorkers = 8, chunksize = None): 
    """ 
    Purpose: Evaluate function using Multiple cores. 

    Input: 
     func  - Function to evaluate in parallel 
     arg  - Array of arguments to evaluate func(arg) 
     static_arg - The "static" argument (if any), i.e. the variables that are  constant in the evaluation of func. 
     nWorkers - Number of Workers to process computations. 
    Output: 
     func(i, static_arg) for i in args. 

    """ 
    # Prepare arguments for func: Collect arguments with static argument (if any) 
    if static_arg != None: 
     arguments = [[arg] + static_arg for arg in list(args)] 
    else: 
     arguments = args 

    # Initialize workers 
    pool = mp.Pool(processes = nWorkers) 

    # Evaluate function 
    result = pool.map_async(func, arguments, chunksize = chunksize) 
    pool.close() 
    pool.join() 

    return sp.array(result.get()).flatten() 

# First test-function. Freeze location and scale for the Normal random variates generator. 
# This returns a function that is a method of the class Norm_gen. Methods cannot be pickled 
# so this will give an error. 
def genNorm(loc, scale): 
    def subfunc(a): 
     return spstat.norm.rvs(loc = loc, scale = scale, size = a) 
    return subfunc 

# Second test-function. The same as above but does not return a method of a class. This is a "plain" function and can be 
# pickled 
def test(fargs): 
    x, a, b = fargs 
    return spstat.norm.rvs(size = x, loc = a, scale = b) 

# Try it out. 
N = 1000000 

# Set arguments to function. args1 = [1, 1, 1,... ,1], the purpose is just to generate a random variable of size 1 for each 
# element in the output vector. 
args1 = sp.ones(N) 
static_arg = [0, 1] # standarized normal. 

# This gives the PicklingError 
func = genNorm(*static_arg) 
sim = parfor(func, args1, static_arg = None, nWorkers = 12, chunksize = None) 

# This is OK: 
func = test 
sim = parfor(func, args1, static_arg = static_arg, nWorkers = 12, chunksize = None) 

Po linku podanego w odpowiedzi na pytanie, w Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map(), Steven Bethard (prawie na końcu) sugeruje zastosowanie modułu copy_reg. Jego kod to:

def _pickle_method(method): 
    func_name = method.im_func.__name__ 
    obj = method.im_self 
    cls = method.im_class 
    return _unpickle_method, (func_name, obj, cls) 

def _unpickle_method(func_name, obj, cls): 
    for cls in cls.mro(): 
     try: 
      func = cls.__dict__[func_name] 
     except KeyError: 
      pass 
     else: 
      break 
    return func.__get__(obj, cls) 

import copy_reg 
import types 

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method) 

Nie bardzo rozumiem, jak mogę z tego skorzystać. Jedyne, co mogłem wymyślić, to umieszczenie go tuż przed moim kodem, ale to nie pomogło. Prostym rozwiązaniem jest oczywiście skorzystanie z tego, który działa i unikanie angażowania się w copy_reg. Jestem bardziej zainteresowany uzyskaniem copy_reg, aby działał poprawnie, aby w pełni korzystać z wieloprocesowości bez konieczności omijania problemu za każdym razem.

Dziękuję za pomoc, jest to bardzo cenne.

Matias

Odpowiedz

19

Problemem tutaj jest mniej „ogórka” komunikat o błędzie niż koncepcyjny: wieloprocesowe robi bulić swój kod w „pracownik” różnych procesów w celu przeprowadzenia swoją magię.

Następnie wysyła dane do iz innego procesu, płynnie szeregując i usuwając szereg danych (to jest część, która używa pikle).

Gdy część danych przekazywanych tam iz powrotem jest funkcją - zakłada ona, że ​​funkcja o tej samej nazwie istnieje w procesie odczytywania i (jak sądzę) przekazuje nazwę funkcji jako ciąg znaków. Ponieważ funkcje są bezstanowe, wywołany proces roboczy po prostu wywołuje tę samą funkcję z danymi, które otrzymał. (Funkcje Pythona nie mogą być serializowane poprzez pikle, więc tylko odwołanie jest przekazywane między procesem głównym a procesami roboczymi)

Gdy twoja funkcja jest metodą w instancji - chociaż kiedy kodujemy pythona, to jest bardzo podobne do tak samo jak funkcja, z "automatyczną" zmienną self, nie jest ona pod tą samą nazwą. Ponieważ wystąpienia (obiekty) są stanowe. Oznacza to, że proces roboczy nie ma kopii obiektu, który jest właścicielem metody, którą chcesz wywołać po drugiej stronie.

Praca nad metodami przekazywania metody jako funkcji wywołania funkcji map_async również nie będzie działać - ponieważ procesor wieloprocesowy korzysta z referencji funkcji, a nie z rzeczywistej funkcji podczas jej przekazywania.

Powinieneś (1) zmienić kod, aby przekazać funkcję - a nie metodę - procesom roboczym, konwertując wszystkie stany, które obiekt zachowuje na nowe parametry do wywołania. (2) Utwórz funkcję "target" dla wywołania map_async, która rekonstruuje potrzebny obiekt po stronie procesu roboczego, a następnie wywołuje funkcję wewnątrz niego. Najprostsze klasy w Pythonie można samodzielnie dublować, więc możesz przekazać obiekt będący właścicielem funkcji w wywołaniu map_async - a funkcja "target" wywoła odpowiednią metodę po stronie pracownika.

(2) może wydawać się „trudne”, ale to chyba właśnie coś takiego - chyba że klasa Państwa obiekt nie może być trawiona:

import types 

def target(object, *args, **kw): 
    method_name = args[0] 
    return getattr(object, method_name)(*args[1:]) 
(...)  
#And add these 3 lines prior to your map_async call: 


    # Evaluate function 
    if isinstance (func, types.MethodType): 
     arguments.insert(0, func.__name__) 
     func = target 
    result = pool.map_async(func, arguments, chunksize = chunksize) 

* Uwaga: Nie testowałem tego

+0

Dziękuję za Twoją odpowiedź. Mam pytanie i byłbym bardzo wdzięczny, gdybyś mógł odpowiedzieć: 1. Mówisz: "(1) albo zmień kod, aby przekazać funkcję - a nie metodę - procesom roboczym, ... ". To jest to, co robię w mojej drugiej próbie, tj. Z funkcją test(), prawda? Moje pytanie brzmi: jeśli NIE przekazuję funkcji, dlaczego to działa? Czy masz na myśli, że mogę napotkać przyszłe błędy? Próbowałem twojego kodu i to działało, ale nie widzę sensu "komplikowania" rzeczy, jeśli moja pierwsza alternatywa już działała. – matiasq

+0

Chciałbym również zaznaczyć, że twoja alternatywa (2) nie będzie dla mnie działać, ponieważ moim głównym problemem jest to, że klasa, której używam, nie jest dostępna do wybrania. Próbowałem omijać to za pomocą copy_reg, co powinno być możliwe, ponieważ Steve Bethard użył drugiego napisanego przeze mnie kodu i zadziałało to dla niego. Ponownie, dziękuję bardzo za poświęcony czas. – matiasq

+0

Co do mojego pierwszego posta, myliłem się. Napisałem twój kod, ale nie miało to żadnego wpływu, ponieważ "if isinstance (func, types.MethodType):" nigdy nie było prawdziwe i dlatego kod nie został wykonany. Przepraszam, że wcześniej tego nie zauważyłem. – matiasq

Powiązane problemy