2013-05-29 113 views
8

我有一個scipy.sparse.csr_matrix格式的大型稀疏矩陣X,我想用一個numpy數組W來利用並行性來乘這個。經過一些研究後,我發現我需要在多處理中使用Array,以避免在進程之間複製X和W(例如,從這裏:How to combine Pool.map with Array (shared memory) in Python multiprocessing?Is shared readonly data copied to different processes for Python multiprocessing?)。這是我的最新嘗試如何平行scipy稀疏矩陣乘法

import multiprocessing 
import numpy 
import scipy.sparse 
import time 

def initProcess(data, indices, indptr, shape, Warr, Wshp): 
    global XData 
    global XIndices 
    global XIntptr 
    global Xshape 

    XData = data 
    XIndices = indices 
    XIntptr = indptr 
    Xshape = shape 

    global WArray 
    global WShape 

    WArray = Warr  
    WShape = Wshp 

def dot2(args): 
    rowInds, i = args  

    global XData 
    global XIndices 
    global XIntptr 
    global Xshape 

    data = numpy.frombuffer(XData, dtype=numpy.float) 
    indices = numpy.frombuffer(XIndices, dtype=numpy.int32) 
    indptr = numpy.frombuffer(XIntptr, dtype=numpy.int32) 
    Xr = scipy.sparse.csr_matrix((data, indices, indptr), shape=Xshape) 

    global WArray 
    global WShape 
    W = numpy.frombuffer(WArray, dtype=numpy.float).reshape(WShape) 

    return Xr[rowInds[i]:rowInds[i+1], :].dot(W) 

def getMatmat(X): 
    numJobs = multiprocessing.cpu_count() 
    rowInds = numpy.array(numpy.linspace(0, X.shape[0], numJobs+1), numpy.int) 

    #Store the data in X as RawArray objects so we can share it amoung processes 
    XData = multiprocessing.RawArray("d", X.data) 
    XIndices = multiprocessing.RawArray("i", X.indices) 
    XIndptr = multiprocessing.RawArray("i", X.indptr) 

    def matmat(W): 
     WArray = multiprocessing.RawArray("d", W.flatten()) 
     pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), initializer=initProcess, initargs=(XData, XIndices, XIndptr, X.shape, WArray, W.shape)) 
     params = [] 

     for i in range(numJobs): 
      params.append((rowInds, i)) 

     iterator = pool.map(dot2, params) 
     P = numpy.zeros((X.shape[0], W.shape[1])) 

     for i in range(numJobs): 
      P[rowInds[i]:rowInds[i+1], :] = iterator[i] 

     return P 

    return matmat 

if __name__ == '__main__': 
    #Create a random sparse matrix X and a random dense one W  
    X = scipy.sparse.rand(10000, 8000, 0.1) 
    X = X.tocsr() 
    W = numpy.random.rand(8000, 20) 

    startTime = time.time() 
    A = getMatmat(X)(W) 
    parallelTime = time.time()-startTime 

    startTime = time.time() 
    B = X.dot(W) 
    nonParallelTime = time.time()-startTime 

    print(parallelTime, nonParallelTime) 

但是,輸出結果如下所示:(4.431,0.165)表明並行版本比非並行乘法慢得多。

我相信在類似的情況下,當一個人將大數據複製到進程時會導致速度減慢,但這不是這種情況,因爲我使用數組來存儲共享變量(除非發生在numpy.frombuffer或何時創建一個csr_matrix,但後來我找不到直接共享csr_matrix的方法)。速度慢的另一個可能原因是每個過程返回每個矩陣乘法的大結果,但是我不確定是否有辦法解決這個問題。

有人可以看到我要去哪裏嗎? 感謝您的幫助!

更新:我無法確定,但我認爲在進程之間共享大量數據效率並不高,理想情況下我應該使用多線程(儘管全局解釋器鎖(GIL)非常困難)。解決這個問題的一個方法是使用Cython釋放GIL(參見http://docs.cython.org/src/userguide/parallelism.html),儘管很多numpy函數需要通過GIL。

+0

你有numpy/scipy鏈接到優化的多線程ATLAS構建?如果你這樣做,你應該在使用np.dot時免費獲得並行矩陣乘法。 –

+1

我正在使用連接到numpy/scipy的多線程BLAS庫(OpenBLAS),但我測試了X.dot(W)和numpy.dot(X,W)(後者對於稀疏X不起作用),但這不是並行化。 – Charanpal

回答

1

你最好的選擇是用Cython下到C語言。這樣你就可以擊敗GIL並使用OpenMP。我並不感到驚訝,多處理速度較慢 - 這裏有很多開銷。

下面是一個樸素的包裝Openpa封裝的CSparse的稀疏矩陣 - 在python中的矢量產品代碼。

在我的筆記本電腦上,這比scipy運行速度快一點。但我沒有那麼多核心。該代碼,包括setup.py腳本和C頭文件和材料是在這個要點:https://gist.github.com/rmcgibbo/6019670

我懷疑,如果你真的想要的並行代碼要快(在我的筆記本電腦,這是隻有約20%的速度即使在使用4個線程的情況下也比單線程的scipy),您需要更仔細地考慮並行性發生的位置,注意緩存局部性。

# psparse.pyx 

#----------------------------------------------------------------------------- 
# Imports 
#----------------------------------------------------------------------------- 
cimport cython 
cimport numpy as np 
import numpy as np 
import scipy.sparse 
from libc.stddef cimport ptrdiff_t 
from cython.parallel import parallel, prange 

#----------------------------------------------------------------------------- 
# Headers 
#----------------------------------------------------------------------------- 

ctypedef int csi 

ctypedef struct cs: 
    # matrix in compressed-column or triplet form 
    csi nzmax  # maximum number of entries 
    csi m   # number of rows 
    csi n   # number of columns 
    csi *p   # column pointers (size n+1) or col indices (size nzmax) 
    csi *i   # row indices, size nzmax 
    double *x  # numerical values, size nzmax 
    csi nz   # # of entries in triplet matrix, -1 for compressed-col 

cdef extern csi cs_gaxpy (cs *A, double *x, double *y) nogil 
cdef extern csi cs_print (cs *A, csi brief) nogil 

assert sizeof(csi) == 4 

#----------------------------------------------------------------------------- 
# Functions 
#----------------------------------------------------------------------------- 

@cython.boundscheck(False) 
def pmultiply(X not None, np.ndarray[ndim=2, mode='fortran', dtype=np.float64_t] W not None): 
    """Multiply a sparse CSC matrix by a dense matrix 

    Parameters 
    ---------- 
    X : scipy.sparse.csc_matrix 
     A sparse matrix, of size N x M 
    W : np.ndarray[dtype=float564, ndim=2, mode='fortran'] 
     A dense matrix, of size M x P. Note, W must be contiguous and in 
     fortran (column-major) order. You can ensure this using 
     numpy's `asfortranarray` function. 

    Returns 
    ------- 
    A : np.ndarray[dtype=float64, ndim=2, mode='fortran'] 
     A dense matrix, of size N x P, the result of multiplying X by W. 

    Notes 
    ----- 
    This function is parallelized over the columns of W using OpenMP. You 
    can control the number of threads at runtime using the OMP_NUM_THREADS 
    environment variable. The internal sparse matrix code is from CSPARSE, 
    a Concise Sparse matrix package. Copyright (c) 2006, Timothy A. Davis. 
    http://www.cise.ufl.edu/research/sparse/CSparse, licensed under the 
    GNU LGPL v2.1+. 

    References 
    ---------- 
    .. [1] Davis, Timothy A., "Direct Methods for Sparse Linear Systems 
    (Fundamentals of Algorithms 2)," SIAM Press, 2006. ISBN: 0898716136 
    """ 
    if X.shape[1] != W.shape[0]: 
     raise ValueError('matrices are not aligned') 

    cdef int i 
    cdef cs csX 
    cdef np.ndarray[double, ndim=2, mode='fortran'] result 
    cdef np.ndarray[csi, ndim=1, mode = 'c'] indptr = X.indptr 
    cdef np.ndarray[csi, ndim=1, mode = 'c'] indices = X.indices 
    cdef np.ndarray[double, ndim=1, mode = 'c'] data = X.data 

    # Pack the scipy data into the CSparse struct. This is just copying some 
    # pointers. 
    csX.nzmax = X.data.shape[0] 
    csX.m = X.shape[0] 
    csX.n = X.shape[1] 
    csX.p = &indptr[0] 
    csX.i = &indices[0] 
    csX.x = &data[0] 
    csX.nz = -1 # to indicate CSC format 

    result = np.zeros((X.shape[0], W.shape[1]), order='F', dtype=np.double) 
    for i in prange(W.shape[1], nogil=True): 
     # X is in fortran format, so we can get quick access to each of its 
     # columns 
     cs_gaxpy(&csX, &W[0, i], &result[0, i]) 

    return result 

它從CSparse調用一些C.

// src/cs_gaxpy.c 

#include "cs.h" 
/* y = A*x+y */ 
csi cs_gaxpy (const cs *A, const double *x, double *y) 
{ 
    csi p, j, n, *Ap, *Ai ; 
    double *Ax ; 
    if (!CS_CSC (A) || !x || !y) return (0) ;  /* check inputs */ 
    n = A->n ; Ap = A->p ; Ai = A->i ; Ax = A->x ; 
    for (j = 0 ; j < n ; j++) 
    { 
     for (p = Ap [j] ; p < Ap [j+1] ; p++) 
     { 
     y [Ai [p]] += Ax [p] * x [j] ; 
     } 
    } 
    return (1) ; 
} 
+0

感謝您的迴應!我有類似的想法,並且編寫了一個基於Eigen的Cython/OpenMP點積(參見https://github.com/charanpald/sppy/blob/master/sppy/csarray_base.pyx的pdot2d)。在這裏,我將X行分割爲cpu_count塊,它在我的8核心機器上運行速度提高了大約兩倍(我相信它可以改進)。一旦我解決了一些編譯問題,我會盡快與您的解決方案進行比較。 – Charanpal

1

也許有點晚了答覆。通過使用爲Trilinos中的許多功能提供python包裝的pyTrilinos包,可以獲得可靠的並行加速。這裏就是你們的榜樣轉換爲使用pyTrilinos:

from PyTrilinos import Epetra 
from scipy.sparse import rand 
import numpy as np 

n_rows = 10000 
n_cols = 8000 
n_vecs = 20 
fill_factor = 0.1 

comm = Epetra.PyComm() 
my_id = comm.MyPID() 

row_map = Epetra.Map(n_rows, 0, comm) 
out_vec_map = row_map 
in_vec_map = Epetra.Map(n_cols, 0, comm) 
col_map = Epetra.Map(n_cols, range(n_cols), 0, comm) 

n_local_rows = row_map.NumMyElements() 

# Create local block matrix in scipy and convert to Epetra 
X = rand(n_local_rows, n_cols, fill_factor).tocoo() 

A = Epetra.CrsMatrix(Epetra.Copy, row_map, col_map, int(fill_factor*n_cols*1.2), True) 
A.InsertMyValues(X.row, X.col, X.data) 
A.FillComplete() 

# Create sub-vectors in numpy and convert to Epetra format 
W = np.random.rand(in_vec_map.NumMyElements(), n_vecs) 
V = Epetra.MultiVector(in_vec_map, n_vecs) 

V[:] = W.T # order of indices is opposite 

B = Epetra.MultiVector(out_vec_map, n_vecs) 

# Multiply 
A.Multiply(False, V, B) 

然後,您可以使用MPI

mpiexec -n 2 python scipy_to_trilinos.py 

PyTrilinos的其他例子可以在GitHub的倉庫here發現運行這段代碼。當然如果有人使用pyTrilinos,這種使用scipy初始化矩陣的方式可能不是最優的。