Używam procesu i kolejki multiprocessingu. Rozpoczynam kilka funkcji równolegle i większość zachowuje się ładnie: kończą, ich wyjście przechodzi do kolejki i pojawiają się jako .is_alive() == Fałsz. Ale z jakiegoś powodu kilka funkcji nie działa. Zawsze pokazują one .is_alive() == Prawda, nawet po ostatnim wierszu funkcji (polecenie print "Finished") zostało zakończone. Dzieje się tak niezależnie od uruchamianych przeze mnie funkcji, nawet jeśli istnieje tylko jeden. Jeśli nie działają równolegle, funkcje zachowują się poprawnie i powracają normalnie. Co może być przyczyną problemu?Python Multiprocessing: niektóre funkcje nie zwracają się, gdy są kompletne (materiał kolejki jest zbyt duży)
Oto ogólna funkcja, której używam do zarządzania zadaniami. Wszystko, czego nie pokazuję, to funkcje, które im przekazuję. Są długie, często używają matplotlib, czasami uruchamiają niektóre polecenia powłoki, ale nie wiem, co mają wspólnego te błędy.
def runFunctionsInParallel(listOf_FuncAndArgLists):
"""
Take a list of lists like [function, arg1, arg2, ...]. Run those functions in parallel, wait for them all to finish, and return the list of their return values, in order.
"""
from multiprocessing import Process, Queue
def storeOutputFFF(fff,theArgs,que): #add a argument to function for assigning a queue
print 'MULTIPROCESSING: Launching %s in parallel '%fff.func_name
que.put(fff(*theArgs)) #we're putting return value into queue
print 'MULTIPROCESSING: Finished %s in parallel! '%fff.func_name
# We get this far even for "bad" functions
return
queues=[Queue() for fff in listOf_FuncAndArgLists] #create a queue object for each function
jobs = [Process(target=storeOutputFFF,args=[funcArgs[0],funcArgs[1:],queues[iii]]) for iii,funcArgs in enumerate(listOf_FuncAndArgLists)]
for job in jobs: job.start() # Launch them all
import time
from math import sqrt
n=1
while any([jj.is_alive() for jj in jobs]): # debugging section shows progress updates
n+=1
time.sleep(5+sqrt(n)) # Wait a while before next update. Slow down updates for really long runs.
print('\n---------------------------------------------------\n'+ '\t'.join(['alive?','Job','exitcode','Func',])+ '\n---------------------------------------------------')
print('\n'.join(['%s:\t%s:\t%s:\t%s'%(job.is_alive()*'Yes',job.name,job.exitcode,listOf_FuncAndArgLists[ii][0].func_name) for ii,job in enumerate(jobs)]))
print('---------------------------------------------------\n')
# I never get to the following line when one of the "bad" functions is running.
for job in jobs: job.join() # Wait for them all to finish... Hm, Is this needed to get at the Queues?
# And now, collect all the outputs:
return([queue.get() for queue in queues])
Całe ujęcie w ciemności: Czy te wiszące zwracają wartość? (dosłownie, czy mają w nich 'return'?) – Logan
Wszystkie funkcje, dobre i złe, zwracają pojedynczy (długi) ciąg znaków. – CPBL
Jeśli jednak wyeliminuję użycie kolejki, problem zniknie. Więc ... kolejka została wypełniona. Mogę na to patrzeć i wygląda dobrze, ale jakoś praca nie kończy się, gdy istnieje powiązana kolejka (i tylko dla "złych" funkcji). – CPBL