Mam dużą rzadką macierz X w formacie scipy.sparse.csr_matrix i chciałbym pomnożyć to przez tablicę W przy użyciu paralelizmu. Po pewnych badaniach odkryłem, że muszę używać Array w trybie wieloprocesowym, aby uniknąć kopiowania X i W między procesami (np. Z tutaj: How to combine Pool.map with Array (shared memory) in Python multiprocessing? i Is shared readonly data copied to different processes for Python multiprocessing?). Oto moja najnowsza próbaJak paralelizować mnożenie scipy macierzy rzadkiej
import multiprocessing
import numpy
import scipy.sparse
import time
def initProcess(data, indices, indptr, shape, Warr, Wshp):
global XData
global XIndices
global XIntptr
global Xshape
XData = data
XIndices = indices
XIntptr = indptr
Xshape = shape
global WArray
global WShape
WArray = Warr
WShape = Wshp
def dot2(args):
rowInds, i = args
global XData
global XIndices
global XIntptr
global Xshape
data = numpy.frombuffer(XData, dtype=numpy.float)
indices = numpy.frombuffer(XIndices, dtype=numpy.int32)
indptr = numpy.frombuffer(XIntptr, dtype=numpy.int32)
Xr = scipy.sparse.csr_matrix((data, indices, indptr), shape=Xshape)
global WArray
global WShape
W = numpy.frombuffer(WArray, dtype=numpy.float).reshape(WShape)
return Xr[rowInds[i]:rowInds[i+1], :].dot(W)
def getMatmat(X):
numJobs = multiprocessing.cpu_count()
rowInds = numpy.array(numpy.linspace(0, X.shape[0], numJobs+1), numpy.int)
#Store the data in X as RawArray objects so we can share it amoung processes
XData = multiprocessing.RawArray("d", X.data)
XIndices = multiprocessing.RawArray("i", X.indices)
XIndptr = multiprocessing.RawArray("i", X.indptr)
def matmat(W):
WArray = multiprocessing.RawArray("d", W.flatten())
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), initializer=initProcess, initargs=(XData, XIndices, XIndptr, X.shape, WArray, W.shape))
params = []
for i in range(numJobs):
params.append((rowInds, i))
iterator = pool.map(dot2, params)
P = numpy.zeros((X.shape[0], W.shape[1]))
for i in range(numJobs):
P[rowInds[i]:rowInds[i+1], :] = iterator[i]
return P
return matmat
if __name__ == '__main__':
#Create a random sparse matrix X and a random dense one W
X = scipy.sparse.rand(10000, 8000, 0.1)
X = X.tocsr()
W = numpy.random.rand(8000, 20)
startTime = time.time()
A = getMatmat(X)(W)
parallelTime = time.time()-startTime
startTime = time.time()
B = X.dot(W)
nonParallelTime = time.time()-startTime
print(parallelTime, nonParallelTime)
jednak wyjście jest coś takiego: (4,431, 0,165), co wskazuje wersję równoległego jest znacznie wolniejszy niż non-równoległego mnożenia.
Wierzę, że spowolnienie może być spowodowane w podobnych sytuacjach, gdy kopiowane są duże dane do procesów, ale tak nie jest w tym przypadku, ponieważ używam Array do przechowywania wspólnych zmiennych (chyba że dzieje się to w numpy.frombuffer lub gdy tworzenie csr_matrix, ale wtedy nie mogłem znaleźć sposobu na udostępnienie csr_matrix bezpośrednio). Inną możliwą przyczyną powolnej prędkości jest zwracanie dużego wyniku każdego mnożenia macierzy dla każdego procesu, jednak nie jestem pewien w jaki sposób obejść to.
Czy ktoś może zobaczyć, gdzie się źle dzieje? Dzięki za pomoc!
Aktualizacja: Nie mogę być pewien, ale myślę, że dzielenie się dużymi ilościami danych między procesami nie jest tak wydajne, a najlepiej powinienem używać wielowątkowości (chociaż Globalny Blok Tłumaczeń (GIL) bardzo to utrudnia). Jednym ze sposobów obejścia tego jest zwolnienie GIL przy użyciu na przykład Cythona (patrz http://docs.cython.org/src/userguide/parallelism.html), chociaż wiele funkcji numpy musi przejść przez GIL.
Czy masz numpy/scipy połączone ze zoptymalizowaną, wielowątkową kompilacją ATLAS?Jeśli to zrobisz, powinieneś otrzymać równoległe mnożenie macierzy za darmo, gdy użyjesz np.dot. –
Używam wielowątkowej biblioteki BLAS (OpenBLAS) połączonej z numpy/scipy, ale przetestowałem X.dot (W) i numpy.dot (X, W) (ta ostatnia nie działa dla rzadkich X), a to nie jest sparaliżowany. – Charanpal