我有一個scipy.sparse.csr_matrix格式的大型稀疏矩陣X,我想用一個numpy數組W來利用並行性來乘這個。經過一些研究後,我發現我需要在多處理中使用Array,以避免在進程之間複製X和W(例如,從這裏:How to combine Pool.map with Array (shared memory) in Python multiprocessing?和Is shared readonly data copied to different processes for Python multiprocessing?)。這是我的最新嘗試如何平行scipy稀疏矩陣乘法
import multiprocessing
import numpy
import scipy.sparse
import time
def initProcess(data, indices, indptr, shape, Warr, Wshp):
global XData
global XIndices
global XIntptr
global Xshape
XData = data
XIndices = indices
XIntptr = indptr
Xshape = shape
global WArray
global WShape
WArray = Warr
WShape = Wshp
def dot2(args):
rowInds, i = args
global XData
global XIndices
global XIntptr
global Xshape
data = numpy.frombuffer(XData, dtype=numpy.float)
indices = numpy.frombuffer(XIndices, dtype=numpy.int32)
indptr = numpy.frombuffer(XIntptr, dtype=numpy.int32)
Xr = scipy.sparse.csr_matrix((data, indices, indptr), shape=Xshape)
global WArray
global WShape
W = numpy.frombuffer(WArray, dtype=numpy.float).reshape(WShape)
return Xr[rowInds[i]:rowInds[i+1], :].dot(W)
def getMatmat(X):
numJobs = multiprocessing.cpu_count()
rowInds = numpy.array(numpy.linspace(0, X.shape[0], numJobs+1), numpy.int)
#Store the data in X as RawArray objects so we can share it amoung processes
XData = multiprocessing.RawArray("d", X.data)
XIndices = multiprocessing.RawArray("i", X.indices)
XIndptr = multiprocessing.RawArray("i", X.indptr)
def matmat(W):
WArray = multiprocessing.RawArray("d", W.flatten())
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), initializer=initProcess, initargs=(XData, XIndices, XIndptr, X.shape, WArray, W.shape))
params = []
for i in range(numJobs):
params.append((rowInds, i))
iterator = pool.map(dot2, params)
P = numpy.zeros((X.shape[0], W.shape[1]))
for i in range(numJobs):
P[rowInds[i]:rowInds[i+1], :] = iterator[i]
return P
return matmat
if __name__ == '__main__':
#Create a random sparse matrix X and a random dense one W
X = scipy.sparse.rand(10000, 8000, 0.1)
X = X.tocsr()
W = numpy.random.rand(8000, 20)
startTime = time.time()
A = getMatmat(X)(W)
parallelTime = time.time()-startTime
startTime = time.time()
B = X.dot(W)
nonParallelTime = time.time()-startTime
print(parallelTime, nonParallelTime)
但是,輸出結果如下所示:(4.431,0.165)表明並行版本比非並行乘法慢得多。
我相信在類似的情況下,當一個人將大數據複製到進程時會導致速度減慢,但這不是這種情況,因爲我使用數組來存儲共享變量(除非發生在numpy.frombuffer或何時創建一個csr_matrix,但後來我找不到直接共享csr_matrix的方法)。速度慢的另一個可能原因是每個過程返回每個矩陣乘法的大結果,但是我不確定是否有辦法解決這個問題。
有人可以看到我要去哪裏嗎? 感謝您的幫助!
更新:我無法確定,但我認爲在進程之間共享大量數據效率並不高,理想情況下我應該使用多線程(儘管全局解釋器鎖(GIL)非常困難)。解決這個問題的一個方法是使用Cython釋放GIL(參見http://docs.cython.org/src/userguide/parallelism.html),儘管很多numpy函數需要通過GIL。
你有numpy/scipy鏈接到優化的多線程ATLAS構建?如果你這樣做,你應該在使用np.dot時免費獲得並行矩陣乘法。 –
我正在使用連接到numpy/scipy的多線程BLAS庫(OpenBLAS),但我測試了X.dot(W)和numpy.dot(X,W)(後者對於稀疏X不起作用),但這不是並行化。 – Charanpal