Nie jestem ekspertem od Pythona, ale udało mi się napisać kod wieloprocesowy, który wykorzystuje wszystkie moje procesory i rdzenie na moim komputerze. Mój kod ładuje bardzo dużą tablicę, około 1,6 GB, i muszę aktualizować tablicę w każdym procesie. Na szczęście aktualizacja polega na dodaniu do obrazu sztucznych gwiazd, a każdy proces ma inny zestaw pozycji obrazu, w którym można dodać sztuczne gwiazdy.Python multiprocessing i wspólna zmienna
Obraz jest zbyt duży i nie mogę utworzyć nowego za każdym razem, gdy wywoływany jest proces. Moim rozwiązaniem było utworzenie zmiennej we wspólnej pamięci i zaoszczędzenie dużej ilości pamięci. Z jakiegoś powodu działa na 90% obrazu, ale są regiony, w których mój kod dodaje losowe liczby w niektórych pozycjach, które wysłałem wcześniej do procesów. Czy jest to związane ze sposobem tworzenia wspólnej zmiennej? Czy procesy zakłócają się nawzajem podczas wykonywania mojego kodu?
Coś dziwnego, że przy użyciu pojedynczego procesora i pojedynczego rdzenia, obrazy są w 100% doskonałe i nie ma żadnych liczb losowych dodanych do obrazu. Czy sugerujesz mi sposób na udostępnienie dużej tablicy pomiędzy wieloma procesami? Tutaj odpowiednia część mojego kodu. Proszę przeczytać wiersz, kiedy zdefiniuję zmienną im_data.
import warnings
warnings.filterwarnings("ignore")
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
import matplotlib.pyplot as plt
import sys,os
import subprocess
import numpy as np
import time
import cv2 as cv
import pyfits
from pyfits import getheader
import multiprocessing, Queue
import ctypes
class Worker(multiprocessing.Process):
def __init__(self, work_queue, result_queue):
# base class initialization
multiprocessing.Process.__init__(self)
# job management stuff
self.work_queue = work_queue
self.result_queue = result_queue
self.kill_received = False
def run(self):
while not self.kill_received:
# get a task
try:
i_range, psf_file = self.work_queue.get_nowait()
except Queue.Empty:
break
# the actual processing
print "Adding artificial stars - index range=", i_range
radius=16
x_c,y_c=((psf_size[1]-1)/2, (psf_size[2]-1)/2)
x,y=np.meshgrid(np.arange(psf_size[1])-x_c,np.arange(psf_size[2])-y_c)
distance = np.sqrt(x**2 + y**2)
for i in range(i_range[0],i_range[1]):
psf_xy=np.zeros(psf_size[1:3], dtype=float)
j=0
for i_order in range(psf_order+1):
j_order=0
while (i_order+j_order < psf_order+1):
psf_xy += psf_data[j,:,:] * ((mock_y[i]-psf_offset[1])/psf_scale[1])**i_order * ((mock_x[i]-psf_offset[0])/psf_scale[0])**j_order
j_order+=1
j+=1
psf_factor=10.**((30.-mock_mag[i])/2.5)/np.sum(psf_xy)
psf_xy *= psf_factor
npsf_xy=cv.resize(psf_xy,(npsf_size[0],npsf_size[1]),interpolation=cv.INTER_LANCZOS4)
npsf_factor=10.**((30.-mock_mag[i])/2.5)/np.sum(npsf_xy)
npsf_xy *= npsf_factor
im_rangex=[max(mock_x[i]-npsf_size[1]/2,0), min(mock_x[i]-npsf_size[1]/2+npsf_size[1], im_size[1])]
im_rangey=[max(mock_y[i]-npsf_size[0]/2,0), min(mock_y[i]-npsf_size[0]/2+npsf_size[0], im_size[0])]
npsf_rangex=[max(-1*(mock_x[i]-npsf_size[1]/2),0), min(-1*(mock_x[i]-npsf_size[1]/2-im_size[1]),npsf_size[1])]
npsf_rangey=[max(-1*(mock_y[i]-npsf_size[0]/2),0), min(-1*(mock_y[i]-npsf_size[0]/2-im_size[0]),npsf_size[0])]
im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10.
self.result_queue.put(id)
if __name__ == "__main__":
n_cpu=2
n_core=6
n_processes=n_cpu*n_core*1
input_mock_file=sys.argv[1]
print "Reading file ", im_file[i]
hdu=pyfits.open(im_file[i])
data=hdu[0].data
im_size=data.shape
im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1])
im_data = np.ctypeslib.as_array(im_data_base.get_obj())
im_data = im_data.reshape(im_size[0], im_size[1])
im_data[:] = data
data=0
assert im_data.base.base is im_data_base.get_obj()
# run
# load up work queue
tic=time.time()
j_step=np.int(np.ceil(mock_n*1./n_processes))
j_range=range(0,mock_n,j_step)
j_range.append(mock_n)
work_queue = multiprocessing.Queue()
for j in range(np.size(j_range)-1):
if work_queue.full():
print "Oh no! Queue is full after only %d iterations" % j
work_queue.put((j_range[j:j+2], psf_file[i]))
# create a queue to pass to workers to store the results
result_queue = multiprocessing.Queue()
# spawn workers
for j in range(n_processes):
worker = Worker(work_queue, result_queue)
worker.start()
# collect the results off the queue
while not work_queue.empty():
result_queue.get()
print "Writing file ", mock_im_file[i]
hdu[0].data=im_data
hdu.writeto(mock_im_file[i])
print "%f s for parallel computation." % (time.time() - tic)
Zamiast udostępniać dużą tablicę, nie można podzielić jej na mniejsze subarrayi i wysłać te subarrayy do podprocesów? A następnie połącz wyniki z oryginalną tablicą. – freakish
A także rozważyć użycie czegoś innego niż Python do przetwarzania tak dużych obrazów (dodatek C?). – freakish