2013-03-20 19 views
6

Wiem, że istnieje wiele postów na temat wymiany stosów związanych z pisaniem wyników z wieloprocesowości do pojedynczego pliku i opracowałem mój kod po przeczytaniu tylko tych postów. Co staram się osiągnąć, to uruchomić funkcję "RevMapCoord" równolegle i zapisać jego wynik w jednym pliku przy użyciu multiprocess.queue. Ale mam problem w kolejce do mojej pracy. Mój kod:Wieloprocesorowość w Pythonie przy użyciu kolejki do zapisu do tego samego pliku

def RevMapCoord(list): 
    "Read a file, Find String and Do something" 

def feed(queue, parlist): 
    for par in parlist: 
     print ('Echo from Feeder: %s' % (par)) 
     queue.put(par) 
    print ('**Feeder finished queing**') 

def calc(queueIn, queueOut): 
    print ('Worker function started') 
    while True: 
     try: 
      par = queueIn.get(block = False) 
      res = RevMapCoord(final_res) 
      queueOut.put((par,res)) 
     except: 
      break 

def write(queue, fname): 
    fhandle = open(fname, "w") 
    while True: 
     try: 
      par, res = queue.get(block = False) 
      print >>fhandle, par, res 
     except: 
      break 
    fhandle.close() 


feedProc = Process(target = feed , args = (workerQueue, final_res)) 
calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nproc)] 
writProc = Process(target = write, args = (writerQueue, sco_inp_extend_geno)) 

feedProc.start() 
print ('Feeder is joining') 
feedProc.join() 
for p in calcProc: 
    p.start() 
for p in calcProc: 
    p.join() 
writProc.start() 
writProc.join() 

Po uruchomieniu tego skryptu kodu przechodzimy do kroku "feedProc.start()". W ciągu ostatnich kilku linie wyjściowe z ekranu pokazuje oświadczenie wydruku od końca „feedProc.start()”:

Echo from Feeder: >AK779,AT61680,50948-50968,50959,6,0.406808,Ashley,Dayne 
Echo from Feeder: >AK832,AT30210,1091-1111,1102,7,0.178616,John,Caine 
**Feeder finished queing** 

Ale wisi przed wykonaniem następnego wiersza „feedProc.join()”. Kod nie daje żadnego błędu i działa dalej, ale nic nie robi (zawiesza się). Proszę powiedz mi, jaki błąd popełniam.

Odpowiedz

0

osiągnąłem piśmie wyniki.. z wieloprocesorowe w jednym pliku przez uing funkcję „map_async” w Python3 Oto funkcja pisałem.

def PPResults(module,alist):##Parallel processing 
    npool = Pool(int(nproc))  
    res = npool.map_async(module, alist) 
    results = (res.get())###results returned in form of a list 
    return results 

Więc zapewnić tę funkcję z listą parametrów w „a_list” i „moduł” jest funkcja, która wykonuje przetwarzanie i zwraca wyniki. Powyższa funkcja nadal zbiera wyniki w formie listy i zwraca z powrotem, gdy wszystkie para liczniki z "a_list" zostały przetworzone. Wyniki mogą nie być poprawne, ale ponieważ zamówienie nie było dla mnie ważne, działało to dobrze. Lista „wynik” można powtórzyć i indywidualne wyniki zapisany w pliku jak:

fh_out = open('./TestResults', 'w') 
for i in results:##Write Results from list to file 
    fh_out.write(i) 

Aby zachować kolejność wyników może musimy użyć „kolejek” podobna do już wspomniałem w moim pytaniu (powyżej). Chociaż jestem w stanie naprawić kod, ale uważam, że nie jest to konieczne, aby być tu wspomnianym.

Dzięki

AK

9

Myślę, że powinieneś schudnąć swój przykład do podstaw. Na przykład:

from multiprocessing import Process, Queue 

def f(q): 
    q.put('Hello') 
    q.put('Bye') 
    q.put(None) 

if __name__ == '__main__': 
    q = Queue() 
    p = Process(target=f, args=(q,)) 
    p.start() 
    with open('file.txt', 'w') as fp: 
     while True: 
      item = q.get() 
      print(item) 
      if item is None: 
       break 
      fp.write(item) 
    p.join() 

Mam tutaj dwa procesy (główny proces, a p). p umieszcza ciągi w kolejce, które są pobierane przez główny proces. Gdy główny proces znajdzie Brak (wartownik, że używam do wskazania: „Skończyłem” przerywa pętlę

Rozszerzenie to do wielu procesów (lub wątków) jest trywialne

+2

należy spróbować uruchomić swój przykład (to daje błąd). W ten sposób nie można umieścić wielu pozycji w kolejce. Właśnie włożyłeś jeden element - listę. – Gerrat

+0

'TypeError: oczekiwano obiektu bufora znaków' Mam err: | – nk9

+1

@ b1- * Nowa * (i prawidłowa, dzięki Gerrat) wersja działa z pythonem 2.7.5 i 3.2.3. Spróbuj! – Hernan

Powiązane problemy