2013-05-29 9 views
8

Mam dużą rzadką macierz X w formacie scipy.sparse.csr_matrix i chciałbym pomnożyć to przez tablicę W przy użyciu paralelizmu. Po pewnych badaniach odkryłem, że muszę używać Array w trybie wieloprocesowym, aby uniknąć kopiowania X i W między procesami (np. Z tutaj: How to combine Pool.map with Array (shared memory) in Python multiprocessing? i Is shared readonly data copied to different processes for Python multiprocessing?). Oto moja najnowsza próbaJak paralelizować mnożenie scipy macierzy rzadkiej

import multiprocessing 
import numpy 
import scipy.sparse 
import time 

def initProcess(data, indices, indptr, shape, Warr, Wshp): 
    global XData 
    global XIndices 
    global XIntptr 
    global Xshape 

    XData = data 
    XIndices = indices 
    XIntptr = indptr 
    Xshape = shape 

    global WArray 
    global WShape 

    WArray = Warr  
    WShape = Wshp 

def dot2(args): 
    rowInds, i = args  

    global XData 
    global XIndices 
    global XIntptr 
    global Xshape 

    data = numpy.frombuffer(XData, dtype=numpy.float) 
    indices = numpy.frombuffer(XIndices, dtype=numpy.int32) 
    indptr = numpy.frombuffer(XIntptr, dtype=numpy.int32) 
    Xr = scipy.sparse.csr_matrix((data, indices, indptr), shape=Xshape) 

    global WArray 
    global WShape 
    W = numpy.frombuffer(WArray, dtype=numpy.float).reshape(WShape) 

    return Xr[rowInds[i]:rowInds[i+1], :].dot(W) 

def getMatmat(X): 
    numJobs = multiprocessing.cpu_count() 
    rowInds = numpy.array(numpy.linspace(0, X.shape[0], numJobs+1), numpy.int) 

    #Store the data in X as RawArray objects so we can share it amoung processes 
    XData = multiprocessing.RawArray("d", X.data) 
    XIndices = multiprocessing.RawArray("i", X.indices) 
    XIndptr = multiprocessing.RawArray("i", X.indptr) 

    def matmat(W): 
     WArray = multiprocessing.RawArray("d", W.flatten()) 
     pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), initializer=initProcess, initargs=(XData, XIndices, XIndptr, X.shape, WArray, W.shape)) 
     params = [] 

     for i in range(numJobs): 
      params.append((rowInds, i)) 

     iterator = pool.map(dot2, params) 
     P = numpy.zeros((X.shape[0], W.shape[1])) 

     for i in range(numJobs): 
      P[rowInds[i]:rowInds[i+1], :] = iterator[i] 

     return P 

    return matmat 

if __name__ == '__main__': 
    #Create a random sparse matrix X and a random dense one W  
    X = scipy.sparse.rand(10000, 8000, 0.1) 
    X = X.tocsr() 
    W = numpy.random.rand(8000, 20) 

    startTime = time.time() 
    A = getMatmat(X)(W) 
    parallelTime = time.time()-startTime 

    startTime = time.time() 
    B = X.dot(W) 
    nonParallelTime = time.time()-startTime 

    print(parallelTime, nonParallelTime) 

jednak wyjście jest coś takiego: (4,431, 0,165), co wskazuje wersję równoległego jest znacznie wolniejszy niż non-równoległego mnożenia.

Wierzę, że spowolnienie może być spowodowane w podobnych sytuacjach, gdy kopiowane są duże dane do procesów, ale tak nie jest w tym przypadku, ponieważ używam Array do przechowywania wspólnych zmiennych (chyba że dzieje się to w numpy.frombuffer lub gdy tworzenie csr_matrix, ale wtedy nie mogłem znaleźć sposobu na udostępnienie csr_matrix bezpośrednio). Inną możliwą przyczyną powolnej prędkości jest zwracanie dużego wyniku każdego mnożenia macierzy dla każdego procesu, jednak nie jestem pewien w jaki sposób obejść to.

Czy ktoś może zobaczyć, gdzie się źle dzieje? Dzięki za pomoc!

Aktualizacja: Nie mogę być pewien, ale myślę, że dzielenie się dużymi ilościami danych między procesami nie jest tak wydajne, a najlepiej powinienem używać wielowątkowości (chociaż Globalny Blok Tłumaczeń (GIL) bardzo to utrudnia). Jednym ze sposobów obejścia tego jest zwolnienie GIL przy użyciu na przykład Cythona (patrz http://docs.cython.org/src/userguide/parallelism.html), chociaż wiele funkcji numpy musi przejść przez GIL.

+0

Czy masz numpy/scipy połączone ze zoptymalizowaną, wielowątkową kompilacją ATLAS?Jeśli to zrobisz, powinieneś otrzymać równoległe mnożenie macierzy za darmo, gdy użyjesz np.dot. –

+1

Używam wielowątkowej biblioteki BLAS (OpenBLAS) połączonej z numpy/scipy, ale przetestowałem X.dot (W) i numpy.dot (X, W) (ta ostatnia nie działa dla rzadkich X), a to nie jest sparaliżowany. – Charanpal

Odpowiedz

1

Najlepszym wyjściem jest zejście do C z Cython. W ten sposób możesz pokonać GIL i używać OpenMP. Nie dziwię się, że proces wieloprocesowy jest wolniejszy - jest tam dużo napięć.

Oto naiwne opakowanie otoki OpenMP z rzadkiej macierzy CSparse - wektor kodu produktu w python.

Na moim laptopie działa to trochę szybciej niż scipy. Ale nie mam tak wielu rdzeni. Kod, w tym skrypt setup.py i pliki nagłówkowe C i takie tam, jest w tej kwestii: https://gist.github.com/rmcgibbo/6019670

Podejrzewam, że jeśli naprawdę chcesz, aby kod równoległy był szybki (na moim laptopie, jest tylko o 20% szybszy niż single-threaded scipy, nawet przy użyciu 4 wątków), musisz uważniej zastanowić się, gdzie dzieje się równoległość niż ja, zwracając uwagę na lokalizację pamięci podręcznej.

# psparse.pyx 

#----------------------------------------------------------------------------- 
# Imports 
#----------------------------------------------------------------------------- 
cimport cython 
cimport numpy as np 
import numpy as np 
import scipy.sparse 
from libc.stddef cimport ptrdiff_t 
from cython.parallel import parallel, prange 

#----------------------------------------------------------------------------- 
# Headers 
#----------------------------------------------------------------------------- 

ctypedef int csi 

ctypedef struct cs: 
    # matrix in compressed-column or triplet form 
    csi nzmax  # maximum number of entries 
    csi m   # number of rows 
    csi n   # number of columns 
    csi *p   # column pointers (size n+1) or col indices (size nzmax) 
    csi *i   # row indices, size nzmax 
    double *x  # numerical values, size nzmax 
    csi nz   # # of entries in triplet matrix, -1 for compressed-col 

cdef extern csi cs_gaxpy (cs *A, double *x, double *y) nogil 
cdef extern csi cs_print (cs *A, csi brief) nogil 

assert sizeof(csi) == 4 

#----------------------------------------------------------------------------- 
# Functions 
#----------------------------------------------------------------------------- 

@cython.boundscheck(False) 
def pmultiply(X not None, np.ndarray[ndim=2, mode='fortran', dtype=np.float64_t] W not None): 
    """Multiply a sparse CSC matrix by a dense matrix 

    Parameters 
    ---------- 
    X : scipy.sparse.csc_matrix 
     A sparse matrix, of size N x M 
    W : np.ndarray[dtype=float564, ndim=2, mode='fortran'] 
     A dense matrix, of size M x P. Note, W must be contiguous and in 
     fortran (column-major) order. You can ensure this using 
     numpy's `asfortranarray` function. 

    Returns 
    ------- 
    A : np.ndarray[dtype=float64, ndim=2, mode='fortran'] 
     A dense matrix, of size N x P, the result of multiplying X by W. 

    Notes 
    ----- 
    This function is parallelized over the columns of W using OpenMP. You 
    can control the number of threads at runtime using the OMP_NUM_THREADS 
    environment variable. The internal sparse matrix code is from CSPARSE, 
    a Concise Sparse matrix package. Copyright (c) 2006, Timothy A. Davis. 
    http://www.cise.ufl.edu/research/sparse/CSparse, licensed under the 
    GNU LGPL v2.1+. 

    References 
    ---------- 
    .. [1] Davis, Timothy A., "Direct Methods for Sparse Linear Systems 
    (Fundamentals of Algorithms 2)," SIAM Press, 2006. ISBN: 0898716136 
    """ 
    if X.shape[1] != W.shape[0]: 
     raise ValueError('matrices are not aligned') 

    cdef int i 
    cdef cs csX 
    cdef np.ndarray[double, ndim=2, mode='fortran'] result 
    cdef np.ndarray[csi, ndim=1, mode = 'c'] indptr = X.indptr 
    cdef np.ndarray[csi, ndim=1, mode = 'c'] indices = X.indices 
    cdef np.ndarray[double, ndim=1, mode = 'c'] data = X.data 

    # Pack the scipy data into the CSparse struct. This is just copying some 
    # pointers. 
    csX.nzmax = X.data.shape[0] 
    csX.m = X.shape[0] 
    csX.n = X.shape[1] 
    csX.p = &indptr[0] 
    csX.i = &indices[0] 
    csX.x = &data[0] 
    csX.nz = -1 # to indicate CSC format 

    result = np.zeros((X.shape[0], W.shape[1]), order='F', dtype=np.double) 
    for i in prange(W.shape[1], nogil=True): 
     # X is in fortran format, so we can get quick access to each of its 
     # columns 
     cs_gaxpy(&csX, &W[0, i], &result[0, i]) 

    return result 

To wywołuje niektóre C z CSparse.

// src/cs_gaxpy.c 

#include "cs.h" 
/* y = A*x+y */ 
csi cs_gaxpy (const cs *A, const double *x, double *y) 
{ 
    csi p, j, n, *Ap, *Ai ; 
    double *Ax ; 
    if (!CS_CSC (A) || !x || !y) return (0) ;  /* check inputs */ 
    n = A->n ; Ap = A->p ; Ai = A->i ; Ax = A->x ; 
    for (j = 0 ; j < n ; j++) 
    { 
     for (p = Ap [j] ; p < Ap [j+1] ; p++) 
     { 
     y [Ai [p]] += Ax [p] * x [j] ; 
     } 
    } 
    return (1) ; 
} 
+0

Dzięki za odpowiedź! Miałem podobne myśli i napisałem produkt dot. Cython/OpenMP oparty na Eigen (patrz pdot2d https://github.com/charanpald/sppy/blob/master/sppy/csarray_base.pyx). Tutaj dzielę rzędy X na bloki cpu_count i na moim 8 rdzeniowym komputerze działa to około 2x szybciej (jestem pewien, że można to poprawić). Porównam z twoim rozwiązaniem, gdy tylko rozwiążę kilka problemów z kompilacją. – Charanpal

1

Może trochę późno z odpowiedzią. Możliwe jest uzyskanie niezawodnych równoległych przyspieszeń za pomocą pakietu pyTrilinos, który zapewnia pakowanie python do wielu funkcji w Trilinos. Oto Twój przykład przekształcany w użyciu pyTrilinos:

from PyTrilinos import Epetra 
from scipy.sparse import rand 
import numpy as np 

n_rows = 10000 
n_cols = 8000 
n_vecs = 20 
fill_factor = 0.1 

comm = Epetra.PyComm() 
my_id = comm.MyPID() 

row_map = Epetra.Map(n_rows, 0, comm) 
out_vec_map = row_map 
in_vec_map = Epetra.Map(n_cols, 0, comm) 
col_map = Epetra.Map(n_cols, range(n_cols), 0, comm) 

n_local_rows = row_map.NumMyElements() 

# Create local block matrix in scipy and convert to Epetra 
X = rand(n_local_rows, n_cols, fill_factor).tocoo() 

A = Epetra.CrsMatrix(Epetra.Copy, row_map, col_map, int(fill_factor*n_cols*1.2), True) 
A.InsertMyValues(X.row, X.col, X.data) 
A.FillComplete() 

# Create sub-vectors in numpy and convert to Epetra format 
W = np.random.rand(in_vec_map.NumMyElements(), n_vecs) 
V = Epetra.MultiVector(in_vec_map, n_vecs) 

V[:] = W.T # order of indices is opposite 

B = Epetra.MultiVector(out_vec_map, n_vecs) 

# Multiply 
A.Multiply(False, V, B) 

Następnie można uruchomić ten kod za pomocą MPI

mpiexec -n 2 python scipy_to_trilinos.py 

Inne przykłady PyTrilinos można znaleźć w repozytorium github here. Oczywiście, gdyby użyć pyTrilinos, ten sposób inicjacji macierzy za pomocą scipy może nie być najbardziej optymalny.

Powiązane problemy