2013-04-09 15 views
5

Nie jestem ekspertem od Pythona, ale udało mi się napisać kod wieloprocesowy, który wykorzystuje wszystkie moje procesory i rdzenie na moim komputerze. Mój kod ładuje bardzo dużą tablicę, około 1,6 GB, i muszę aktualizować tablicę w każdym procesie. Na szczęście aktualizacja polega na dodaniu do obrazu sztucznych gwiazd, a każdy proces ma inny zestaw pozycji obrazu, w którym można dodać sztuczne gwiazdy.Python multiprocessing i wspólna zmienna

Obraz jest zbyt duży i nie mogę utworzyć nowego za każdym razem, gdy wywoływany jest proces. Moim rozwiązaniem było utworzenie zmiennej we wspólnej pamięci i zaoszczędzenie dużej ilości pamięci. Z jakiegoś powodu działa na 90% obrazu, ale są regiony, w których mój kod dodaje losowe liczby w niektórych pozycjach, które wysłałem wcześniej do procesów. Czy jest to związane ze sposobem tworzenia wspólnej zmiennej? Czy procesy zakłócają się nawzajem podczas wykonywania mojego kodu?

Coś dziwnego, że przy użyciu pojedynczego procesora i pojedynczego rdzenia, obrazy są w 100% doskonałe i nie ma żadnych liczb losowych dodanych do obrazu. Czy sugerujesz mi sposób na udostępnienie dużej tablicy pomiędzy wieloma procesami? Tutaj odpowiednia część mojego kodu. Proszę przeczytać wiersz, kiedy zdefiniuję zmienną im_data.

import warnings 
warnings.filterwarnings("ignore") 

from mpl_toolkits.mplot3d import Axes3D 
from matplotlib import cm 
import matplotlib.pyplot as plt 
import sys,os 
import subprocess 
import numpy as np 
import time 
import cv2 as cv 
import pyfits 
from pyfits import getheader 
import multiprocessing, Queue 
import ctypes 

class Worker(multiprocessing.Process): 


def __init__(self, work_queue, result_queue): 

    # base class initialization 
    multiprocessing.Process.__init__(self) 

    # job management stuff 
    self.work_queue = work_queue 
    self.result_queue = result_queue 
    self.kill_received = False 

def run(self): 
    while not self.kill_received: 

     # get a task 
     try: 
      i_range, psf_file = self.work_queue.get_nowait() 
     except Queue.Empty: 
      break 

     # the actual processing 
     print "Adding artificial stars - index range=", i_range 

     radius=16 
     x_c,y_c=((psf_size[1]-1)/2, (psf_size[2]-1)/2) 
     x,y=np.meshgrid(np.arange(psf_size[1])-x_c,np.arange(psf_size[2])-y_c) 
     distance = np.sqrt(x**2 + y**2) 

     for i in range(i_range[0],i_range[1]): 
      psf_xy=np.zeros(psf_size[1:3], dtype=float) 
      j=0 
      for i_order in range(psf_order+1): 
       j_order=0 
       while (i_order+j_order < psf_order+1): 
        psf_xy += psf_data[j,:,:] * ((mock_y[i]-psf_offset[1])/psf_scale[1])**i_order * ((mock_x[i]-psf_offset[0])/psf_scale[0])**j_order 
        j_order+=1 
        j+=1 


      psf_factor=10.**((30.-mock_mag[i])/2.5)/np.sum(psf_xy) 
      psf_xy *= psf_factor 

      npsf_xy=cv.resize(psf_xy,(npsf_size[0],npsf_size[1]),interpolation=cv.INTER_LANCZOS4) 
      npsf_factor=10.**((30.-mock_mag[i])/2.5)/np.sum(npsf_xy) 
      npsf_xy *= npsf_factor 

      im_rangex=[max(mock_x[i]-npsf_size[1]/2,0), min(mock_x[i]-npsf_size[1]/2+npsf_size[1], im_size[1])] 
      im_rangey=[max(mock_y[i]-npsf_size[0]/2,0), min(mock_y[i]-npsf_size[0]/2+npsf_size[0], im_size[0])] 
      npsf_rangex=[max(-1*(mock_x[i]-npsf_size[1]/2),0), min(-1*(mock_x[i]-npsf_size[1]/2-im_size[1]),npsf_size[1])] 
      npsf_rangey=[max(-1*(mock_y[i]-npsf_size[0]/2),0), min(-1*(mock_y[i]-npsf_size[0]/2-im_size[0]),npsf_size[0])] 

      im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10. 


     self.result_queue.put(id) 

if __name__ == "__main__": 

    n_cpu=2 
    n_core=6 
    n_processes=n_cpu*n_core*1 
    input_mock_file=sys.argv[1] 

    print "Reading file ", im_file[i] 
    hdu=pyfits.open(im_file[i]) 
    data=hdu[0].data 
    im_size=data.shape 

    im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1]) 
    im_data = np.ctypeslib.as_array(im_data_base.get_obj()) 
    im_data = im_data.reshape(im_size[0], im_size[1]) 
    im_data[:] = data 
    data=0 
    assert im_data.base.base is im_data_base.get_obj() 

    # run 
    # load up work queue 
    tic=time.time() 
    j_step=np.int(np.ceil(mock_n*1./n_processes)) 
    j_range=range(0,mock_n,j_step) 
    j_range.append(mock_n) 


    work_queue = multiprocessing.Queue() 
    for j in range(np.size(j_range)-1): 
    if work_queue.full(): 
     print "Oh no! Queue is full after only %d iterations" % j 
    work_queue.put((j_range[j:j+2], psf_file[i])) 

    # create a queue to pass to workers to store the results 
    result_queue = multiprocessing.Queue() 

    # spawn workers 
    for j in range(n_processes): 
    worker = Worker(work_queue, result_queue) 
    worker.start() 

    # collect the results off the queue 
    while not work_queue.empty(): 
    result_queue.get() 

    print "Writing file ", mock_im_file[i] 
    hdu[0].data=im_data 
    hdu.writeto(mock_im_file[i]) 
    print "%f s for parallel computation." % (time.time() - tic) 
+1

Zamiast udostępniać dużą tablicę, nie można podzielić jej na mniejsze subarrayi i wysłać te subarrayy do podprocesów? A następnie połącz wyniki z oryginalną tablicą. – freakish

+0

A także rozważyć użycie czegoś innego niż Python do przetwarzania tak dużych obrazów (dodatek C?). – freakish

Odpowiedz

3

Myślę, że problem (jak zasugerował to w swoim pytaniu) wynika z faktu, że jesteś pisanie w tej samej tablicy z wielu wątków.

im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1]) 
im_data = np.ctypeslib.as_array(im_data_base.get_obj()) 
im_data = im_data.reshape(im_size[0], im_size[1]) 
im_data[:] = data 

Chociaż jestem pewien, że można napisać do im_data_base w sposób „proces bezpieczny” (a niejawny blokada jest używana przez pytona synchronizować dostęp do tablicy), nie jestem pewien, że można napisać do im_data w sposób bezpieczny dla procesu.

Chciałbym zatem (choć nie jestem pewien, że rozwiąże problem) doradza Ci stworzyć wyraźny blokadę wokół im_data

# Disable python implicit lock, we are going to use our own 
im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1], 
    lock=False) 
im_data = np.ctypeslib.as_array(im_data_base.get_obj()) 
im_data = im_data.reshape(im_size[0], im_size[1]) 
im_data[:] = data 
# Create our own lock 
im_data_lock = Lock() 

następnie w procesach, nabywa blokadę za każdym razem trzeba zmodyfikować im_data

self.im_data_lock.acquire() 
im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10 
self.im_data_lock.release() 

pominąłem kod blokady przejść do contructor swojego procesu i przechowywać go jako pole użytkownik (self.im_data_lock) przez wzgląd na zwięzłość. Powinieneś także przekazać tablicę im_data do konstruktora procesu i zapisać go jako pole członkowskie.

1

Problem występuje w przykładzie, gdy wiele wątków zapisuje w nakładających się regionach obrazu/tablicy. Rzeczywiście, musisz albo umieścić jedną blokadę na obrazie, albo utworzyć zestaw blokad na sekcje obrazu (aby zmniejszyć rywalizację o blokadę).

Można również tworzyć modyfikacje obrazu w jednym zestawie procesów i wykonywać rzeczywistą modyfikację obrazu w oddzielnym pojedynczym wątku.

Powiązane problemy