2015-12-01 12 views
8

Chciałbym przekazać argumenty słów kluczowych do mojej funkcji pracownika z Pool.map(). Nie mogę znaleźć tego wyraźnego przykładu podczas przeszukiwania forów.przekazywanie kwargs z multiprocessing.pool.map

Przykład Kod:

import multiprocessing as mp 

def worker((x,y), **kwargs): 
    kwarg_test = kwargs.get('kwarg_test', False) 
    print("kwarg_test = {}".format(kwarg_test))  
    if kwarg_test: 
     print("Success") 
    return x*y 

def wrapper_process(**kwargs): 
    jobs = [] 
    pool=mp.Pool(4) 
    for i, n in enumerate(range(4)): 
     jobs.append((n,i)) 
    pool.map(worker, jobs) #works 
    pool.map(worker, jobs, kwargs) #how to do this? 

def main(**kwargs): 
    worker((1,2),kwarg_test=True) #accepts kwargs 
    wrapper_process(kwarg_test=True) 

if __name__ == "__main__":  
    main() 

wyjściowa:

kwarg_test = True 
Success 
kwarg_test = False 
kwarg_test = False 
kwarg_test = False 
kwarg_test = False 
TypeError: unsupported operand type(s) for //: 'int' and 'dict' 

Błąd typ ma do czynienia z argumentami analizowania wewnątrz multiprocessing.Pool lub kolejki, a ja próbowałem kilka innych składnie, jak robienie lista kwargów; [kwargs, kwargs, kwargs, kwargs], a także kilka prób włączenia kwarg do listy zadań, ale bez powodzenia. Śledziłem kod w trybie wieloprocesorowym. Z mapy do map_async i dotarłem do 01: w pool.py po napotkaniu struktury generatora. Jestem szczęśliwy, aby dowiedzieć się więcej na ten temat w przyszłości, ale na razie próbuję się dowiedzieć:

Czy istnieje prosta składnia pozwalająca na przekazanie kwargs z pool.map?

Odpowiedz

6

W multiprocessing.pool.Pool.map doc stany:

równoległy odpowiednik mapie() funkcja wbudowana (to obsługuje tylko jeden argument, choć iterowalny). Blokuje się, aż wynik będzie gotowy.

Możemy przekazać tylko jeden argument iteracyjny. Koniec historii. Ale możemy Luckilly myśleć obejście: define worker_wrapper funkcję, która pobiera jeden argument, rozpakowuje je args i kwargs, a następnie przekazuje je do worker:

def worker_wrapper(arg): 
    args, kwargs = arg 
    return worker(*args, **kwargs) 

W swojej wrapper_process, trzeba skonstruować ten jeden argument z jobs (lub nawet bezpośrednio przy konstruowaniu pracy) i wywołać worker_wrapper:

arg = [(j, kwargs) for j in jobs] 
pool.map(worker_wrapper, arg) 

Oto implementacja działa, przechowywane jak najbardziej zbliżone do oryginalnego kodu :

import multiprocessing as mp 

def worker_wrapper(arg): 
    args, kwargs = arg 
    return worker(*args, **kwargs) 

def worker(x, y, **kwargs): 
    kwarg_test = kwargs.get('kwarg_test', False) 
    # print("kwarg_test = {}".format(kwarg_test))  
    if kwarg_test: 
     print("Success") 
    else: 
     print("Fail") 
    return x*y 

def wrapper_process(**kwargs): 
    jobs = [] 
    pool=mp.Pool(4) 
    for i, n in enumerate(range(4)): 
     jobs.append((n,i)) 
    arg = [(j, kwargs) for j in jobs] 
    pool.map(worker_wrapper, arg) 

def main(**kwargs): 
    print("=> calling `worker`") 
    worker(1, 2,kwarg_test=True) #accepts kwargs 
    print("=> no kwargs") 
    wrapper_process() # no kwargs 
    print("=> with `kwar_test=True`") 
    wrapper_process(kwarg_test=True) 

if __name__ == "__main__":  
    main() 

który przechodzi test:

=> calling `worker` 
Success 
=> no kwargs 
Fail 
Fail 
Fail 
Fail 
=> with `kwar_test=True` 
Success 
Success 
Success 
Success 
8

Jeśli chcesz iteracyjne nad inne argumenty, użyj @ odpowiedź ArcturusB użytkownika.

Jeśli tylko chcesz je przekazać, mające taką samą wartość dla każdej iteracji, to można to zrobić:

from functools import partial 
pool.map(partial(worker, **kwargs), jobs) 

Partial argumenty „” wiąże się z funkcją. Stare wersje Pythona cannot serializują jednak obiekty częściowe.

Powiązane problemy