2013-05-14 18 views
5

Próbuję użyć pakietu multiprocessing w python z puli.Python multiprocessing Pool with map_async

mam f funkcja, która jest wywoływana przez funkcję map_async:

from multiprocessing import Pool 

def f(host, x): 
    print host 
    print x 

hosts = ['1.1.1.1', '2.2.2.2'] 
pool = Pool(processes=5) 
pool.map_async(f,hosts,"test") 
pool.close() 
pool.join() 

Kod ten ma następny błąd:

Traceback (most recent call last): 
    File "pool-test.py", line 9, in <module> 
    pool.map_async(f,hosts,"test") 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 290, in map_async 
    result = MapResult(self._cache, chunksize, len(iterable), callback) 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 557, in __init__ 
    self._number_left = length//chunksize + bool(length % chunksize) 
TypeError: unsupported operand type(s) for //: 'int' and 'str' 

nie wiem jak przekazać więcej niż 1 argumentu do funkcji f. Czy jest jakiś sposób?

+0

Po prostu można użyć 'pool.map' i upuść "test"' 'zmienną zupełnie obojętne. – danodonovan

Odpowiedz

7

"test" jest interpretowany jako argument słowa kluczowego chunksize (patrz the docs).

Kod powinien prawdopodobnie (tu kopia wklejony z mojego sesji ipython):

from multiprocessing import Pool 

def f(arg): 
    host, x = arg 
    print host 
    print x 

hosts = ['1.1.1.1', '2.2.2.2'] 
args = ((host, "test") for host in hosts) 
pool = Pool(processes=5) 
pool.map_async(f, args) 
pool.close() 
pool.join() 
## -- End pasted text -- 

1.1.1.1 
test 
2.2.2.2 
test 

Uwaga: W Pythonie 3 można użyć starmap, który rozpakuje argumenty z krotek. Będziesz w stanie uniknąć wyraźnej konieczności wykonania host, x = arg.

+0

Testowałem to, ale wynik nie jest dobry; drukuje oba hosty, ale tylko "t" i "e" słowa "test". – dseira

+0

Dziwne. Z pewnością nie robi tego na moim komputerze. Zobacz aktualizację moich wyników - skopiowałem je i ponownie je sprawdziłem. –

+0

Z x = ["test", "test"] działa, ale nie ma sensu, ponieważ wyobraża sobie, że lista hostów ma około 10000 i chcę tylko jeden x do porównania wyników. Nie jest możliwe posiadanie listy x zawierającej 10000 wpisów z tym samym wynikiem. W każdym razie, dzięki. – dseira

1

jak przypominam, Pool(). Map() i .map_async() konkretnie akceptują tylko jeden argument. to ograniczenie można obejść, przekazując listę, ale oczywiście potrzebna jest zindywidualizowana funkcja zaprojektowana do wzięcia obiektu typu list (podobnego) jako argumentu.

jednym podejściem jest jednorazowe napisanie niestandardowego kodu - a także ogólne opakowanie "funkcja + args". i przerabia się coś takiego (Uwaga: to jest tylko częściowo sprawdzone)

def tmp_test(): 
    # a short test script: 
    # 
    A=[[1,2], [2,3], [4,5], [6,7]] 
    P=mpp.Pool(mpp.cpu_count()) 
    X=P.map_async(map_helper, [[operator.eq]+a for a in A]) 
    # 
    return X.get() 


def null_funct(args=[], kwargs={}): 
    # a place-holder 
    pass 
# 
def map_helper(args_in = [null_funct, [], {}]): 
    # helper function for pool.map_async(). pass data as a list(-like object): 
    # [function, [args], {kwargs}] (though we'll allow for some mistakes). 
    # 
    funct = args_in[0] 
    # 
    # allow for different formatting options: 
    if not (isinstance(args_in[1], list) or isinstance(args_in[1], tuple) or isinstance(args_in[1], dict)): 
     # probably passed a list of parameters. just use them: 
     args = args_in[1:] 
     # 
     return funct(*args) 
    # 
    # if the args are "properly" formatted: 
    args=[] 
    kwargs = {} 
    for arg in args_in[1:]: 
     # assign list types to args, dict types to kwargs... 
     if isinstance(arg, list) or isinstance(arg, tuple): args += arg 
     if isinstance(arg, dict): kwargs.update(arg) 
    return funct(*args, **kwargs) 
3

Basen zwraca menedżera kontekstowe w Pythonie 3, a więc z rachunku mogą być użyte. Pozwala to uniknąć problemów z wyjątkami i oznacza brak konieczności zamykania i dołączania. W tym przypadku funkcja zawsze otrzymuje stałą dla zmiennej x, więc można to zrobić z częściową oceną. map_async jest leniwy, więc musimy uzyskać wynik działania, a także użyć mapy. Zatem:

from multiprocessing import Pool 
from functools import partial 

def f(host, x): 
    print(host) 
    print(x) 

hosts = ('1.1.1.1', '2.2.2.2') 
with Pool(processes=5) as pool: 
    pool.map(partial(f, x='test'), hosts) 

skutkuje:

 
1.1.1.1 
test 
2.2.2.2 
test 
Powiązane problemy