2015-07-29 11 views
5

Próbuję podzielić na pętli tjProcess.join() i kolejka nie działają z dużą liczbą

N = 1000000 
for i in xrange(N): 
    #do something 

wykorzystaniem multiprocessing.Process i to działa dobrze dla małych wartości N. Problem pojawia się, gdy Używam większych wartości N. Coś dziwnego dzieje się przed lub w trakcie p.join() i program nie odpowiada. Jeśli umieściłem print i, zamiast q.put (i) w definicji funkcji f wszystko działa dobrze.

Byłbym wdzięczny za każdą pomoc. Oto kod.

from multiprocessing import Process, Queue 

def f(q,nMin, nMax): # function for multiprocessing 
    for i in xrange(nMin,nMax): 
     q.put(i) 

if __name__ == '__main__': 

    nEntries = 1000000 

    nCpu = 10 
    nEventsPerCpu = nEntries/nCpu 
    processes = [] 

    q = Queue() 

    for i in xrange(nCpu): 
     processes.append(Process(target=f, args=(q,i*nEventsPerCpu,(i+1)*nEventsPerCpu))) 

    for p in processes: 
     p.start() 

    for p in processes: 
     p.join() 

    print q.qsize() 

Odpowiedz

8

Próbujesz rozwijać swoją kolejkę bez granic, i dołączają do podproces że czeka na sali w kolejce, więc główny proces jest zablokowany czekając, że ktoś do wypełnienia, i nigdy będzie.

Jeśli wyciągniesz dane z kolejki przed połączeniem, to będzie działać poprawnie.

Jedną z technik można użyć coś takiego:

while 1: 
    running = any(p.is_alive() for p in processes) 
    while not queue.empty(): 
     process_queue_data() 
    if not running: 
     break 

Zgodnie z dokumentacją, p.is_alive() należy wykonać niejawna przyłączyć, ale również wydaje się sugerować, że najlepszą praktyką może być jawnie wykonać sprzężenia na wszystkich wątkach po tym.

Edycja: Chociaż jest to całkiem jasne, może nie być to wszystko, co jest wydajne. W jaki sposób sprawisz, że będzie on działał lepiej, będzie wymagał dużej ilości zadań i specyfiki maszyny (i generalnie nie powinieneś tworzyć wielu procesów jednocześnie, chyba że niektóre będą blokowane na I/O).

Oprócz zmniejszenia liczby procesów do liczby procesorów, kilka prostych poprawek, aby nieco szybciej (ponownie, w zależności od okoliczności) może wyglądać następująco:

liveprocs = list(processes) 
while liveprocs: 
    try: 
     while 1: 
      process_queue_data(q.get(False)) 
    except Queue.Empty: 
     pass 

    time.sleep(0.5) # Give tasks a chance to put more data in 
    if not q.empty(): 
     continue 
    liveprocs = [p for p in liveprocs if p.is_alive()] 
+0

Dziękujemy! To działa. – Puibo

+0

Wysyłam mój skrypt do maszyny, która ma około 30 procesorów, więc przy 10 procesach wciąż jestem daleko od max. Czy są jakieś inne powody, dla których powinienem zmniejszyć liczbę procesów? Przeprowadzam analizę danych (50 g danych, czyli około 9 milionów zdarzeń). Moim pomysłem było podzielenie danych na części (na przykład 10) i użycie przetwarzania wieloprocesowego. Jeśli masz jakąkolwiek radę, byłbym wdzięczny. – Puibo

+0

Więcej procesów jest dobre w stosunku do liczby procesorów - nawet po liczbie procesorów, jeśli procesy będą czasami blokowane. Sposób, w jaki sformułowano twoje wstępne pytanie, myślałam, że może to był problem z zadaniami domowymi programowania - nie wiedziałeś, że masz potężną maszynę :) W każdym razie, jedną miarą do rozważenia jest to, ile przyspieszasz, robiąc rzeczy jednotomowe - - jeśli masz 10-krotne przyspieszenie z 10 procesami (mało prawdopodobne), to świetnie! Zredukowanie zależności (oczekiwania) między procesami jest kluczowe - jak już zauważyłeś, musisz drenować kolejkę. –

Powiązane problemy