Mam problem z używaniem Pool.map_async() (i także Pool.map()) w module przetwarzania wieloprocesowego. Zaimplementowałem funkcję równoległą do pętli, która działa dobrze, dopóki wejście funkcji do Pool.map_async jest funkcją "regularną". Gdy funkcją jest np. metody do klasy, a potem dostaję PicklingError:Błąd PicklingError podczas korzystania z przetwarzania wieloprocesowego
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
używam Python tylko do obliczeń naukowych, więc nie jestem tak obeznany z pojęciem peklowania, właśnie dowiedziałem się trochę o tym dzisiaj. Przyjrzałem się kilku wcześniejszym odpowiedziom, takim jak Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map(), ale nie wiem, jak to zrobić, nawet jeśli podążam za linkiem podanym w odpowiedzi.
Mój kod, miały na celu symulację wektora Normal r.v za pomocą wielu rdzeni. Zauważ, że jest to tylko przykład i może nawet nie opłaca się działać na wielu rdzeniach.
import multiprocessing as mp
import scipy as sp
import scipy.stats as spstat
def parfor(func, args, static_arg = None, nWorkers = 8, chunksize = None):
"""
Purpose: Evaluate function using Multiple cores.
Input:
func - Function to evaluate in parallel
arg - Array of arguments to evaluate func(arg)
static_arg - The "static" argument (if any), i.e. the variables that are constant in the evaluation of func.
nWorkers - Number of Workers to process computations.
Output:
func(i, static_arg) for i in args.
"""
# Prepare arguments for func: Collect arguments with static argument (if any)
if static_arg != None:
arguments = [[arg] + static_arg for arg in list(args)]
else:
arguments = args
# Initialize workers
pool = mp.Pool(processes = nWorkers)
# Evaluate function
result = pool.map_async(func, arguments, chunksize = chunksize)
pool.close()
pool.join()
return sp.array(result.get()).flatten()
# First test-function. Freeze location and scale for the Normal random variates generator.
# This returns a function that is a method of the class Norm_gen. Methods cannot be pickled
# so this will give an error.
def genNorm(loc, scale):
def subfunc(a):
return spstat.norm.rvs(loc = loc, scale = scale, size = a)
return subfunc
# Second test-function. The same as above but does not return a method of a class. This is a "plain" function and can be
# pickled
def test(fargs):
x, a, b = fargs
return spstat.norm.rvs(size = x, loc = a, scale = b)
# Try it out.
N = 1000000
# Set arguments to function. args1 = [1, 1, 1,... ,1], the purpose is just to generate a random variable of size 1 for each
# element in the output vector.
args1 = sp.ones(N)
static_arg = [0, 1] # standarized normal.
# This gives the PicklingError
func = genNorm(*static_arg)
sim = parfor(func, args1, static_arg = None, nWorkers = 12, chunksize = None)
# This is OK:
func = test
sim = parfor(func, args1, static_arg = static_arg, nWorkers = 12, chunksize = None)
Po linku podanego w odpowiedzi na pytanie, w Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map(), Steven Bethard (prawie na końcu) sugeruje zastosowanie modułu copy_reg. Jego kod to:
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
import copy_reg
import types
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
Nie bardzo rozumiem, jak mogę z tego skorzystać. Jedyne, co mogłem wymyślić, to umieszczenie go tuż przed moim kodem, ale to nie pomogło. Prostym rozwiązaniem jest oczywiście skorzystanie z tego, który działa i unikanie angażowania się w copy_reg. Jestem bardziej zainteresowany uzyskaniem copy_reg, aby działał poprawnie, aby w pełni korzystać z wieloprocesowości bez konieczności omijania problemu za każdym razem.
Dziękuję za pomoc, jest to bardzo cenne.
Matias
Dziękuję za Twoją odpowiedź. Mam pytanie i byłbym bardzo wdzięczny, gdybyś mógł odpowiedzieć: 1. Mówisz: "(1) albo zmień kod, aby przekazać funkcję - a nie metodę - procesom roboczym, ... ". To jest to, co robię w mojej drugiej próbie, tj. Z funkcją test(), prawda? Moje pytanie brzmi: jeśli NIE przekazuję funkcji, dlaczego to działa? Czy masz na myśli, że mogę napotkać przyszłe błędy? Próbowałem twojego kodu i to działało, ale nie widzę sensu "komplikowania" rzeczy, jeśli moja pierwsza alternatywa już działała. – matiasq
Chciałbym również zaznaczyć, że twoja alternatywa (2) nie będzie dla mnie działać, ponieważ moim głównym problemem jest to, że klasa, której używam, nie jest dostępna do wybrania. Próbowałem omijać to za pomocą copy_reg, co powinno być możliwe, ponieważ Steve Bethard użył drugiego napisanego przeze mnie kodu i zadziałało to dla niego. Ponownie, dziękuję bardzo za poświęcony czas. – matiasq
Co do mojego pierwszego posta, myliłem się. Napisałem twój kod, ale nie miało to żadnego wpływu, ponieważ "if isinstance (func, types.MethodType):" nigdy nie było prawdziwe i dlatego kod nie został wykonany. Przepraszam, że wcześniej tego nie zauważyłem. – matiasq