2011-06-29 14 views
8

Mam następujący problem w Pythonie.Zapisywanie do pliku z wieloprocesowym przetwarzaniem

Muszę wykonać kilka równoległych obliczeń, których wyniki należy zapisywać sekwencyjnie w pliku. Stworzyłem więc funkcję, która odbiera uchwyt multiprocessing.Queue i pliku, wykonać obliczenia i wydrukować wynik w pliku:

import multiprocessing 
from multiprocessing import Process, Queue 
from mySimulation import doCalculation 

# doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file 

def work(queue, fh): 
while True: 
    try: 
     parameter = queue.get(block = False) 
     result = doCalculation(parameter) 
     print >>fh, string 
    except: 
     break 


if __name__ == "__main__": 
    nthreads = multiprocessing.cpu_count() 
    fh = open("foo", "w") 
    workQueue = Queue() 
    parList = # list of conditions for which I want to run doCalculation() 
    for x in parList: 
     workQueue.put(x) 
    processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)] 
    for p in processes: 
     p.start() 
    for p in processes: 
     p.join() 
    fh.close() 

ale plik kończy się pusty po uruchamia skrypt. Próbowałem zmienić funkcję worker() na:

def work(queue, filename): 
while True: 
    try: 
     fh = open(filename, "a") 
     parameter = queue.get(block = False) 
     result = doCalculation(parameter) 
     print >>fh, string 
     fh.close() 
    except: 
     break 

i przekazanie nazwy pliku jako parametru. Wtedy działa tak, jak zamierzałem. Kiedy próbuję zrobić to samo sekwencyjnie, bez wieloprocesowości, działa to również normalnie.

Dlaczego to nie działało w pierwszej wersji? Nie widzę problemu.

Ponadto: czy mogę zagwarantować, że dwa procesy nie spróbują jednocześnie zapisać pliku?


EDIT:

Dzięki. Mam to teraz. To jest działająca wersja:

import multiprocessing 
from multiprocessing import Process, Queue 
from time import sleep 
from random import uniform 

def doCalculation(par): 
    t = uniform(0,2) 
    sleep(t) 
    return par * par # just to simulate some calculation 

def feed(queue, parlist): 
    for par in parlist: 
      queue.put(par) 

def calc(queueIn, queueOut): 
    while True: 
     try: 
      par = queueIn.get(block = False) 
      print "dealing with ", par, "" 
      res = doCalculation(par) 
      queueOut.put((par,res)) 
     except: 
      break 

def write(queue, fname): 
    fhandle = open(fname, "w") 
    while True: 
     try: 
      par, res = queue.get(block = False) 
      print >>fhandle, par, res 
     except: 
      break 
    fhandle.close() 

if __name__ == "__main__": 
    nthreads = multiprocessing.cpu_count() 
    fname = "foo" 
    workerQueue = Queue() 
    writerQueue = Queue() 
    parlist = [1,2,3,4,5,6,7,8,9,10] 
    feedProc = Process(target = feed , args = (workerQueue, parlist)) 
    calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)] 
    writProc = Process(target = write, args = (writerQueue, fname)) 


    feedProc.start() 
    for p in calcProc: 
     p.start() 
    writProc.start() 

    feedProc.join() 
    for p in calcProc: 
     p.join() 
    writProc.join() 
+2

Proszę się skupić. Jeden zestaw kodu ** tylko **. Usuń nieaktualny lub nieistotny kod. Proszę unikać używania "Edytuj". Poproszę, aby pytanie było całkowicie jasne, kompletne i spójne. –

Odpowiedz

16

Naprawdę powinieneś użyć dwóch kolejek i trzech oddzielnych rodzajów przetwarzania.

  1. Umieść rzeczy w kolejce nr 1.

  2. Wyciągnij rzeczy z kolejki # 1 i wykonaj obliczenia, wstawiając rzeczy do kolejki # 2. Możesz mieć ich wiele, ponieważ dostają się z jednej kolejki i bezpiecznie umieszczają w innej kolejce.

  3. Wyciągnij materiał z kolejki # 2 i zapisz go w pliku. Musisz mieć dokładnie 1 z nich i nic więcej. "Jest właścicielem" pliku, gwarantuje dostęp atomowy i absolutnie zapewnia, że ​​plik jest napisany w sposób czysty i spójny.

+1

+1 dla kolejek pracowniczych i konsumenckich. Pamiętaj, aby ustawić maksymalny rozmiar w kolejce, a Twoi pracownicy mogą zjeść twoją pamięć i zagłodzić pisarza. – Bittrance

+0

@ S.Lott @Bittrance, proszę spojrzeć na moją edycję. –

+1

Oh nevermind o wielu uruchomieniach ... Jestem na tyle głupi, aby nie zauważyć, że uruchomiłem feedProc i writProc wiele razy. ¬- Poprawiłem kod. Ale wciąż mam pusty plik. –

4

Jeśli ktoś szuka prostego sposobu na zrobienie tego samego, to może ci pomóc. Nie sądzę, że istnieją jakieś wady, aby to zrobić w ten sposób. Jeśli tak, daj mi znać.

import multiprocessing 
import re 

def mp_worker(item): 
    # Do something 
    return item, count 

def mp_handler(): 
    cpus = multiprocessing.cpu_count() 
    p = multiprocessing.Pool(cpus) 
    # The below 2 lines populate the list. This listX will later be accessed parallely. This can be replaced as long as listX is passed on to the next step. 
    with open('ExampleFile.txt') as f: 
     listX = [line for line in (l.strip() for l in f) if line] 
    with open('results.txt', 'w') as f: 
     for result in p.imap(mp_worker, listX): 
      # (item, count) tuples from worker 
      f.write('%s: %d\n' % result) 

if __name__=='__main__': 
    mp_handler() 

Źródło: Python: Writing to a single file with queue while using multiprocessing Pool

0

Jest to błąd w kodzie zapisu pracownika, jeśli blok jest fałszywa, pracownik nie otrzyma żadnych danych. Powinna być następująca:

par, res = queue.get(block = True) 

Można to sprawdzić poprzez dodanie linii

print "QSize",queueOut.qsize() 

po queueOut.put((par,res))

z blokiem = False można byłoby uzyskanie zwiększając długość kolejki aż kiedykolwiek wypełnia, w odróżnieniu od bloku = True, gdzie zawsze dostajesz "1".

Powiązane problemy