2012-01-27 20 views
26

Próbuję użyć puli Worker w python za pomocą obiektów Process. Każdy pracownik (proces) wykonuje pewną inicjalizację (zajmuje nieprzydatny czas), przechodzi serię zadań (najlepiej używając map()) i zwraca coś. Komunikacja nie jest konieczna. Nie mogę jednak wymyślić, jak używać funkcji map() do korzystania z funkcji mojego pracownika compute().Python Pool with worker Processes

from multiprocessing import Pool, Process 

class Worker(Process): 
    def __init__(self): 
     print 'Worker started' 
     # do some initialization here 
     super(Worker, self).__init__() 

    def compute(self, data): 
     print 'Computing things!' 
     return data * data 

if __name__ == '__main__': 
    # This works fine 
    worker = Worker() 
    print worker.compute(3) 

    # workers get initialized fine 
    pool = Pool(processes = 4, 
       initializer = Worker) 
    data = range(10) 
    # How to use my worker pool? 
    result = pool.map(compute, data) 

jest praca w kolejce na drogę zamiast, czy mogę użyć map()?

+0

Wszystkie obiekty procesu są stanowe. Możesz usunąć to słowo z tytułu. Również. "compute" to metoda pracownika. W przykładach jest to zwykle całkowicie niezależna funkcja. Dlaczego nie napisać funkcji obliczeniowej, aby po prostu włączyć zarówno inicjowanie, jak i przetwarzanie? –

+0

Wystarczająco fair, dzięki. Inicjalizacja zajmuje dużo czasu, więc chcę to zrobić tylko raz na proces roboczy. – Felix

+0

Należy podkreślić, że część pytania dotyczy "zaliczenia serii zadań". Ponieważ nie było to oczywiste. –

Odpowiedz

50

Proponuję użyć kolejki do tego.

class Worker(Process): 
    def __init__(self, queue): 
     super(Worker, self).__init__() 
     self.queue= queue 

    def run(self): 
     print 'Worker started' 
     # do some initialization here 

     print 'Computing things!' 
     for data in iter(self.queue.get, None): 
      # Use data 

Teraz można uruchomić stos z nich, wszystkie z dostaniem pracy z jednej kolejce

request_queue = Queue() 
for i in range(4): 
    Worker(request_queue).start() 
for data in the_real_source: 
    request_queue.put(data) 
# Sentinel objects to allow clean shutdown: 1 per worker. 
for i in range(4): 
    request_queue.put(None) 

Tego rodzaju rzeczy powinny pozwalają amortyzować drogie koszty startowego w wielu robotników.

+0

Tak właśnie pomyślałem, dzięki! W końcu użyłem kolejki zadań (danych wejściowych) i kolejki wyników (danych wyjściowych), aby zsynchronizować wszystko. – Felix

+0

Ty przykład jest niesamowity, próbuję teraz jak wprowadzić obiekty wartownika, gdy naciśnięcie klawisza strg + c zostanie naciśnięte bez wyjątku – Dukeatcoding

+0

@ S.Lott: Czy to nie Queue nie jest w stanie? i dlatego używasz [multiprocessing.Manager() .Queue] (http://stackoverflow.com/questions/3217002/how-do-you-pass-a-queue-reference-to-a-function-managed-by -pool-map-async)? – zuuz

4

initializer spodziewa się arbitralnego wywołania, które przeprowadza inicjalizację, np. Może ustawić niektóre globale, a nie podklasę Process; map akceptuje dowolne iterowalne:

#!/usr/bin/env python 
import multiprocessing as mp 

def init(val): 
    print('do some initialization here') 

def compute(data): 
    print('Computing things!') 
    return data * data 

def produce_data(): 
    yield -100 
    for i in range(10): 
     yield i 
    yield 100 

if __name__=="__main__": 
    p = mp.Pool(initializer=init, initargs=('arg',)) 
    print(p.map(compute, produce_data()))