2013-04-07 14 views
10

Posiadam dużą liczbę niestandardowych obiektów, które są potrzebne do wykonywania niezależnych (równoległych) zadań, w tym modyfikowania parametrów obiektu. Próbowałem używać zarówno menadżera(). Dict, jak i "sharedmem", ale żadne z nich nie działa. Na przykład:Modyfikuj obiekt w trybie wieloprocesorowym Pythona

import numpy as np 
import multiprocessing as mp 
import sharedmem as shm 


class Tester: 

    num = 0.0 
    name = 'none' 
    def __init__(self,tnum=num, tname=name): 
     self.num = tnum 
     self.name = tname 

    def __str__(self): 
     return '%f %s' % (self.num, self.name) 

def mod(test, nn): 
    test.num = np.random.randn() 
    test.name = nn 


if __name__ == '__main__': 

    num = 10 

    tests = np.empty(num, dtype=object) 
    for it in range(num): 
     tests[it] = Tester(tnum=it*1.0) 

    sh_tests = shm.empty(num, dtype=object) 
    for it in range(num): 
     sh_tests[it] = tests[it] 
     print sh_tests[it] 

    print '\n' 
    workers = [ mp.Process(target=mod, args=(test, 'some')) for test in sh_tests ] 

    for work in workers: work.start() 

    for work in workers: work.join() 

    for test in sh_tests: print test 

drukuje:

0.000000 none 
1.000000 none 
2.000000 none 
3.000000 none 
4.000000 none 
5.000000 none 
6.000000 none 
7.000000 none 
8.000000 none 
9.000000 none 


0.000000 none 
1.000000 none 
2.000000 none 
3.000000 none 
4.000000 none 
5.000000 none 
6.000000 none 
7.000000 none 
8.000000 none 
9.000000 none 

Tj obiekty nie są modyfikowane.

Jak osiągnąć pożądane zachowanie?

+0

http: // stackoverflow.com/questions/10721915/shared-memory-objects-in-python-multiprocessing – tacaswell

+0

Czy możesz umieścić link do 'sharedmem' nie mogę znaleźć niczego na ten temat. – tacaswell

Odpowiedz

8

Problem polega na tym, że kiedy przedmioty są przekazywane do procesów roboczych, są pakowane z marynarką, wysyłane do innego procesu, gdzie są rozpakowywane i przepracowywane. Twoje obiekty nie są tak bardzo przekazywane do drugiego procesu, jak klonowane. Nie zwracasz obiektów, więc sklonowany obiekt zostaje szczęśliwie zmodyfikowany, a następnie odrzucony.

Wygląda na to, że nie można tego zrobić (Python: Possible to share in-memory data between 2 separate processes) bezpośrednio.

Co można zrobić, to zwrócić zmodyfikowane obiekty.

import numpy as np 
import multiprocessing as mp 



class Tester: 

    num = 0.0 
    name = 'none' 
    def __init__(self,tnum=num, tname=name): 
     self.num = tnum 
     self.name = tname 

    def __str__(self): 
     return '%f %s' % (self.num, self.name) 

def mod(test, nn, out_queue): 
    print test.num 
    test.num = np.random.randn() 
    print test.num 
    test.name = nn 
    out_queue.put(test) 




if __name__ == '__main__':  
    num = 10 
    out_queue = mp.Queue() 
    tests = np.empty(num, dtype=object) 
    for it in range(num): 
     tests[it] = Tester(tnum=it*1.0) 


    print '\n' 
    workers = [ mp.Process(target=mod, args=(test, 'some', out_queue)) for test in tests ] 

    for work in workers: work.start() 

    for work in workers: work.join() 

    res_lst = [] 
    for j in range(len(workers)): 
     res_lst.append(out_queue.get()) 

    for test in res_lst: print test 

ta ma prowadzić do interesujących obserwacji, że ponieważ procesów potomnych są identyczne, wszystkie one zaczynają z tego samego materiału siewnego dla liczby losowej, więc wszystko generować ten sam „” losowy numer.

+0

Linia '' workers = [mp.Process (... '' wygląda na to, że zaczynasz proces 'num'' (wszystko w tym samym czasie?) W twoim przykładzie jest to tylko dziesięć, ale jak byś to zrobił zastosować to do większych tablic zawierających tysiące lub miliony wpisów (a więc pracowników)? –

3

Nie widzę, abyście przekazywali referencje do procesów potomnych, więc nie widzę, jak praca przez nich wykonana mogłaby zostać zapisana w pamięci współdzielonej. Być może czegoś tutaj brakuje.

Alternatywnie, czy bierzesz pod uwagę numpy.memmap? (BTW: tcaswell, moduł, o którym tutaj mowa, wydaje się być: numpy-sharedmem).

Również warto przeczytać Sturla Molden za Using Python, multiprocessing and NumPy/SciPy for parallel numerical computing (PDF) zgodnie z zaleceniami w odpowiedzi unutbu do [StackOverflow: Jak mogę przekazać duże tablice numpy między podprocesów Pythona bez zapisywania na dysku?] I (How do I pass large numpy arrays between python subprocesses without saving to disk?). i Joe Kingtona StackOverflow: NumPy vs. multiprocessing and mmap.

Mogą być bardziej inspirujące niż bezpośrednio istotne.

+0

dzięki za wskaźnik.Polecam, ale nie byłeś pewien, czy to był odpowiedni pakiet. – tacaswell

+0

+1 za miły zbiór linków! – tacaswell

3

Twój kod nie próbuje modyfikować pamięci współdzielonej. Po prostu klonuje poszczególne obiekty.

dtype=objectsharedmem oznacza, że ​​nie będzie działać z powodów przedstawionych in the link provided by @tcaswell:

dzielenie wykresów obiektów, które zawierają referencje/odnośniki do innych obiektów, jest zasadniczo niewykonalne

dla zwykłego (wartości) typy, z których można korzystać z pamięci współdzielonej, patrz: Use numpy array in shared memory for multiprocessing.

Podejście manager powinny również działać (to tylko kopie obiektów w okolicy):

import random 
from multiprocessing import Pool, Manager 

class Tester(object): 
    def __init__(self, num=0.0, name='none'): 
     self.num = num 
     self.name = name 

    def __repr__(self): 
     return '%s(%r, %r)' % (self.__class__.__name__, self.num, self.name) 

def init(L): 
    global tests 
    tests = L 

def modify(i_t_nn): 
    i, t, nn = i_t_nn 
    t.num += random.normalvariate(mu=0, sigma=1) # modify private copy 
    t.name = nn 
    tests[i] = t # copy back 
    return i 

def main(): 
    num_processes = num = 10 #note: num_processes and num may differ 
    manager = Manager() 
    tests = manager.list([Tester(num=i) for i in range(num)]) 
    print(tests[:2]) 

    args = ((i, t, 'some') for i, t in enumerate(tests)) 
    pool = Pool(processes=num_processes, initializer=init, initargs=(tests,)) 
    for i in pool.imap_unordered(modify, args): 
     print("done %d" % i) 
    pool.close() 
    pool.join() 
    print(tests[:2]) 

if __name__ == '__main__': 
    main() 
Powiązane problemy