2013-02-27 23 views
8

Otrzymuję błąd systemowy (pokazany poniżej) podczas równoległego wykonywania prostych obliczeń opartych na algebrze macierzy opartej na numpy przy użyciu pakietu wieloprocesorowego (python 2.73 z numpy 1.7.0 na Ubuntu 12.04 na Amazon EC2) . Mój kod działa dobrze dla mniejszych rozmiarów matrycy, ale zawiesza się dla większych (z dużą ilością dostępnej pamięci).Błąd systemu podczas wykonywania podprocesów przy użyciu procesu wieloprocesowego

Rozmiar używanych macierzy jest znaczny (mój kod działa dobrze dla 1000000x10 gęstych macierzy, ale zawiesza się dla 1000000x500 - I przechodzę te macierze do/z podprocesów przy okazji). 10 vs 500 jest parametrem wykonawczym, wszystko inne pozostaje takie samo (dane wejściowe, inne parametry czasu wykonywania itp.)

Próbowałem również uruchomić ten sam (przeniesiony) kod przy użyciu python3 - dla większych macierzy podprocesy przechodzą w tryb uśpienia/bezczynności (zamiast awarii, jak w pytonie 2.7), a program/podprocesy po prostu zawieszają się, nie robiąc nic. Dla mniejszych macierzy kod działa dobrze z python3.

Wszelkie sugestie będą bardzo mile widziane (używam z pomysłów tutaj)

wiadomość

Błąd:

Exception in thread Thread-5: Traceback (most recent call last): 
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner 
    self.run() File "/usr/lib/python2.7/threading.py", line 504, in run 
    self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks 
    put(task) SystemError: NULL result without error in PyObject_Call 

Kod Multiprocessing używam:

def runProcessesInParallelAndReturn(proc, listOfInputs, nParallelProcesses): 
    if len(listOfInputs) == 0: 
     return 
    # Add result queue to the list of argument tuples. 
    resultQueue = mp.Manager().Queue() 
    listOfInputsNew = [(argumentTuple, resultQueue) for argumentTuple in listOfInputs] 
    # Create and initialize the pool of workers. 
    pool = mp.Pool(processes = nParallelProcesses) 
    pool.map(proc, listOfInputsNew) 
    # Run the processes. 
    pool.close() 
    pool.join() 
    # Return the results. 
    return [resultQueue.get() for i in range(len(listOfInputs))] 

Poniżej znajduje się " proc ", który zostanie wykonany dla każdego podprocesu. Zasadniczo rozwiązuje wiele układów równań liniowych za pomocą numpy (konstruuje wymagane macierze wewnątrz podprocesu) i zwraca wyniki jako kolejną macierz. Po raz kolejny działa dobrze dla mniejszych wartości jednego parametru wykonawczego, ale zawiesza się (lub zawiesza się w python3) dla większych.

def solveForLFV(param): 
    startTime = time.time() 
    (chunkI, LFVin, XY, sumLFVinOuterProductLFVallPlusPenaltyTerm, indexByIndexPurch, outerProductChunkSize, confWeight), queue = param 
    LFoutChunkSize = XY.shape[0] 
    nLFdim = LFVin.shape[1] 
    sumLFVinOuterProductLFVpurch = np.zeros((nLFdim, nLFdim)) 
    LFVoutChunk = np.zeros((LFoutChunkSize, nLFdim)) 
    for LFVoutIndex in xrange(LFoutChunkSize): 
     LFVInIndexListPurch = indexByIndexPurch[LFVoutIndex] 
     sumLFVinOuterProductLFVpurch[:, :] = 0. 
     LFVInIndexChunkLow, LFVInIndexChunkHigh = getChunkBoundaries(len(LFVInIndexListPurch), outerProductChunkSize) 
     for LFVInIndexChunkI in xrange(len(LFVInIndexChunkLow)): 
      LFVinSlice = LFVin[LFVInIndexListPurch[LFVInIndexChunkLow[LFVInIndexChunkI] : LFVInIndexChunkHigh[LFVInIndexChunkI]], :] 
      sumLFVinOuterProductLFVpurch += sum(LFVinSlice[:, :, np.newaxis] * LFVinSlice[:, np.newaxis, :]) 
     LFVoutChunk[LFVoutIndex, :] = np.linalg.solve(confWeight * sumLFVinOuterProductLFVpurch + sumLFVinOuterProductLFVallPlusPenaltyTerm, XY[LFVoutIndex, :]) 
    queue.put((chunkI, LFVoutChunk)) 
    print 'solveForLFV: ', time.time() - startTime, 'sec' 
    sys.stdout.flush() 
+0

Czy możesz udostępnić kod funkcji proc? – barracel

+0

Po prostu zrobiłem. Nie opisałem argumentów procesu - niektóre z nich są macierzami, niektóre są listami, a niektóre są tylko liczbami/liczbami całkowitymi. 'kolejka' służy do zwracania wyników z każdego podprocesu. – Yevgeny

Odpowiedz

5

500,000,000 jest dość duże: jeśli używasz float64, to 4 miliardy bajtów lub około 4 GB. (10 000 000 macierzy zmiennoprzecinkowej będzie wynosić 80 milionów bajtów, czyli około 80 MB - znacznie mniej.) Spodziewam się, że problem ma coś wspólnego z procesem wieloprocesowym, próbując zebrać tablice i przesłać je do podprocesów przez potok.

Odkąd korzystasz z platformy uniksowej, możesz uniknąć tego zachowania, wykorzystując zachowanie związane z pamięcią dziedziczenia fork() (używane do tworzenia pracowników wieloprocesorowych). Miałem wielki sukces z tym hackem (wyrwany z this project), opisany przez komentarze.

### A helper for letting the forked processes use data without pickling. 
_data_name_cands = (
    '_data_' + ''.join(random.sample(string.ascii_lowercase, 10)) 
    for _ in itertools.count()) 
class ForkedData(object): 
    ''' 
    Class used to pass data to child processes in multiprocessing without 
    really pickling/unpickling it. Only works on POSIX. 

    Intended use: 
     - The master process makes the data somehow, and does e.g. 
      data = ForkedData(the_value) 
     - The master makes sure to keep a reference to the ForkedData object 
      until the children are all done with it, since the global reference 
      is deleted to avoid memory leaks when the ForkedData object dies. 
     - Master process constructs a multiprocessing.Pool *after* 
      the ForkedData construction, so that the forked processes 
      inherit the new global. 
     - Master calls e.g. pool.map with data as an argument. 
     - Child gets the real value through data.value, and uses it read-only. 
    ''' 
    # TODO: does data really need to be used read-only? don't think so... 
    # TODO: more flexible garbage collection options 
    def __init__(self, val): 
        g = globals() 
        self.name = next(n for n in _data_name_cands if n not in g) 
        g[self.name] = val 
        self.master_pid = os.getpid() 

    @property 
    def value(self): 
        return globals()[self.name] 

    def __del__(self): 
        if os.getpid() == self.master_pid: 
            del globals()[self.name] 
Powiązane problemy