Mam następujący problem w Pythonie.Zapisywanie do pliku z wieloprocesowym przetwarzaniem
Muszę wykonać kilka równoległych obliczeń, których wyniki należy zapisywać sekwencyjnie w pliku. Stworzyłem więc funkcję, która odbiera uchwyt multiprocessing.Queue
i pliku, wykonać obliczenia i wydrukować wynik w pliku:
import multiprocessing
from multiprocessing import Process, Queue
from mySimulation import doCalculation
# doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file
def work(queue, fh):
while True:
try:
parameter = queue.get(block = False)
result = doCalculation(parameter)
print >>fh, string
except:
break
if __name__ == "__main__":
nthreads = multiprocessing.cpu_count()
fh = open("foo", "w")
workQueue = Queue()
parList = # list of conditions for which I want to run doCalculation()
for x in parList:
workQueue.put(x)
processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)]
for p in processes:
p.start()
for p in processes:
p.join()
fh.close()
ale plik kończy się pusty po uruchamia skrypt. Próbowałem zmienić funkcję worker() na:
def work(queue, filename):
while True:
try:
fh = open(filename, "a")
parameter = queue.get(block = False)
result = doCalculation(parameter)
print >>fh, string
fh.close()
except:
break
i przekazanie nazwy pliku jako parametru. Wtedy działa tak, jak zamierzałem. Kiedy próbuję zrobić to samo sekwencyjnie, bez wieloprocesowości, działa to również normalnie.
Dlaczego to nie działało w pierwszej wersji? Nie widzę problemu.
Ponadto: czy mogę zagwarantować, że dwa procesy nie spróbują jednocześnie zapisać pliku?
EDIT:
Dzięki. Mam to teraz. To jest działająca wersja:
import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
from random import uniform
def doCalculation(par):
t = uniform(0,2)
sleep(t)
return par * par # just to simulate some calculation
def feed(queue, parlist):
for par in parlist:
queue.put(par)
def calc(queueIn, queueOut):
while True:
try:
par = queueIn.get(block = False)
print "dealing with ", par, ""
res = doCalculation(par)
queueOut.put((par,res))
except:
break
def write(queue, fname):
fhandle = open(fname, "w")
while True:
try:
par, res = queue.get(block = False)
print >>fhandle, par, res
except:
break
fhandle.close()
if __name__ == "__main__":
nthreads = multiprocessing.cpu_count()
fname = "foo"
workerQueue = Queue()
writerQueue = Queue()
parlist = [1,2,3,4,5,6,7,8,9,10]
feedProc = Process(target = feed , args = (workerQueue, parlist))
calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)]
writProc = Process(target = write, args = (writerQueue, fname))
feedProc.start()
for p in calcProc:
p.start()
writProc.start()
feedProc.join()
for p in calcProc:
p.join()
writProc.join()
Proszę się skupić. Jeden zestaw kodu ** tylko **. Usuń nieaktualny lub nieistotny kod. Proszę unikać używania "Edytuj". Poproszę, aby pytanie było całkowicie jasne, kompletne i spójne. –