Otrzymuję błąd systemowy (pokazany poniżej) podczas równoległego wykonywania prostych obliczeń opartych na algebrze macierzy opartej na numpy przy użyciu pakietu wieloprocesorowego (python 2.73 z numpy 1.7.0 na Ubuntu 12.04 na Amazon EC2) . Mój kod działa dobrze dla mniejszych rozmiarów matrycy, ale zawiesza się dla większych (z dużą ilością dostępnej pamięci).Błąd systemu podczas wykonywania podprocesów przy użyciu procesu wieloprocesowego
Rozmiar używanych macierzy jest znaczny (mój kod działa dobrze dla 1000000x10 gęstych macierzy, ale zawiesza się dla 1000000x500 - I przechodzę te macierze do/z podprocesów przy okazji). 10 vs 500 jest parametrem wykonawczym, wszystko inne pozostaje takie samo (dane wejściowe, inne parametry czasu wykonywania itp.)
Próbowałem również uruchomić ten sam (przeniesiony) kod przy użyciu python3 - dla większych macierzy podprocesy przechodzą w tryb uśpienia/bezczynności (zamiast awarii, jak w pytonie 2.7), a program/podprocesy po prostu zawieszają się, nie robiąc nic. Dla mniejszych macierzy kod działa dobrze z python3.
Wszelkie sugestie będą bardzo mile widziane (używam z pomysłów tutaj)
wiadomośćBłąd:
Exception in thread Thread-5: Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
self.run() File "/usr/lib/python2.7/threading.py", line 504, in run
self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
put(task) SystemError: NULL result without error in PyObject_Call
Kod Multiprocessing używam:
def runProcessesInParallelAndReturn(proc, listOfInputs, nParallelProcesses):
if len(listOfInputs) == 0:
return
# Add result queue to the list of argument tuples.
resultQueue = mp.Manager().Queue()
listOfInputsNew = [(argumentTuple, resultQueue) for argumentTuple in listOfInputs]
# Create and initialize the pool of workers.
pool = mp.Pool(processes = nParallelProcesses)
pool.map(proc, listOfInputsNew)
# Run the processes.
pool.close()
pool.join()
# Return the results.
return [resultQueue.get() for i in range(len(listOfInputs))]
Poniżej znajduje się " proc ", który zostanie wykonany dla każdego podprocesu. Zasadniczo rozwiązuje wiele układów równań liniowych za pomocą numpy (konstruuje wymagane macierze wewnątrz podprocesu) i zwraca wyniki jako kolejną macierz. Po raz kolejny działa dobrze dla mniejszych wartości jednego parametru wykonawczego, ale zawiesza się (lub zawiesza się w python3) dla większych.
def solveForLFV(param):
startTime = time.time()
(chunkI, LFVin, XY, sumLFVinOuterProductLFVallPlusPenaltyTerm, indexByIndexPurch, outerProductChunkSize, confWeight), queue = param
LFoutChunkSize = XY.shape[0]
nLFdim = LFVin.shape[1]
sumLFVinOuterProductLFVpurch = np.zeros((nLFdim, nLFdim))
LFVoutChunk = np.zeros((LFoutChunkSize, nLFdim))
for LFVoutIndex in xrange(LFoutChunkSize):
LFVInIndexListPurch = indexByIndexPurch[LFVoutIndex]
sumLFVinOuterProductLFVpurch[:, :] = 0.
LFVInIndexChunkLow, LFVInIndexChunkHigh = getChunkBoundaries(len(LFVInIndexListPurch), outerProductChunkSize)
for LFVInIndexChunkI in xrange(len(LFVInIndexChunkLow)):
LFVinSlice = LFVin[LFVInIndexListPurch[LFVInIndexChunkLow[LFVInIndexChunkI] : LFVInIndexChunkHigh[LFVInIndexChunkI]], :]
sumLFVinOuterProductLFVpurch += sum(LFVinSlice[:, :, np.newaxis] * LFVinSlice[:, np.newaxis, :])
LFVoutChunk[LFVoutIndex, :] = np.linalg.solve(confWeight * sumLFVinOuterProductLFVpurch + sumLFVinOuterProductLFVallPlusPenaltyTerm, XY[LFVoutIndex, :])
queue.put((chunkI, LFVoutChunk))
print 'solveForLFV: ', time.time() - startTime, 'sec'
sys.stdout.flush()
Czy możesz udostępnić kod funkcji proc? – barracel
Po prostu zrobiłem. Nie opisałem argumentów procesu - niektóre z nich są macierzami, niektóre są listami, a niektóre są tylko liczbami/liczbami całkowitymi. 'kolejka' służy do zwracania wyników z każdego podprocesu. – Yevgeny