在使用多處理程序包(Amazon EC2上的Ubuntu 12.04上的numpy 1.7.0使用python 2.73)並行執行一些簡單的基於numpy的矩陣代數計算時,出現系統錯誤(如下所示) 。我的代碼對於較小的矩陣大小工作正常,但對較大的內存崩潰(具有足夠的可用內存)使用多重處理運行子進程時出現系統錯誤
我使用的矩陣大小很大(我的代碼對1000000x10浮點密集矩陣運行正常,但崩潰了1000000x500個 - 我順便通過這些矩陣到/從子進程)。 10 vs 500是一個運行時參數,其他所有內容都保持不變(輸入數據,其他運行時參數等)
我也嘗試使用python3運行相同(移植)的代碼 - 對於較大的矩陣子進程進入睡眠/空閒模式(而不是像Python 2.7中的崩潰),程序/子進程只是掛在那裏無所事事。對於較小的矩陣,代碼可以在python3下正常運行。
任何建議,將不勝感激(我運行的想法在這裏)
錯誤消息:
Exception in thread Thread-5: Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
self.run() File "/usr/lib/python2.7/threading.py", line 504, in run
self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
put(task) SystemError: NULL result without error in PyObject_Call
的多重代碼我使用:
def runProcessesInParallelAndReturn(proc, listOfInputs, nParallelProcesses):
if len(listOfInputs) == 0:
return
# Add result queue to the list of argument tuples.
resultQueue = mp.Manager().Queue()
listOfInputsNew = [(argumentTuple, resultQueue) for argumentTuple in listOfInputs]
# Create and initialize the pool of workers.
pool = mp.Pool(processes = nParallelProcesses)
pool.map(proc, listOfInputsNew)
# Run the processes.
pool.close()
pool.join()
# Return the results.
return [resultQueue.get() for i in range(len(listOfInputs))]
下面是「 proc「執行每個子進程。基本上,它使用numpy解決了許多線性方程組的系統(它在子過程內構造了所需的矩陣)並將結果作爲另一個矩陣返回。再一次,它適用於一個運行時參數的較小值,但對於較大的參數會崩潰(或掛在python3中)。
def solveForLFV(param):
startTime = time.time()
(chunkI, LFVin, XY, sumLFVinOuterProductLFVallPlusPenaltyTerm, indexByIndexPurch, outerProductChunkSize, confWeight), queue = param
LFoutChunkSize = XY.shape[0]
nLFdim = LFVin.shape[1]
sumLFVinOuterProductLFVpurch = np.zeros((nLFdim, nLFdim))
LFVoutChunk = np.zeros((LFoutChunkSize, nLFdim))
for LFVoutIndex in xrange(LFoutChunkSize):
LFVInIndexListPurch = indexByIndexPurch[LFVoutIndex]
sumLFVinOuterProductLFVpurch[:, :] = 0.
LFVInIndexChunkLow, LFVInIndexChunkHigh = getChunkBoundaries(len(LFVInIndexListPurch), outerProductChunkSize)
for LFVInIndexChunkI in xrange(len(LFVInIndexChunkLow)):
LFVinSlice = LFVin[LFVInIndexListPurch[LFVInIndexChunkLow[LFVInIndexChunkI] : LFVInIndexChunkHigh[LFVInIndexChunkI]], :]
sumLFVinOuterProductLFVpurch += sum(LFVinSlice[:, :, np.newaxis] * LFVinSlice[:, np.newaxis, :])
LFVoutChunk[LFVoutIndex, :] = np.linalg.solve(confWeight * sumLFVinOuterProductLFVpurch + sumLFVinOuterProductLFVallPlusPenaltyTerm, XY[LFVoutIndex, :])
queue.put((chunkI, LFVoutChunk))
print 'solveForLFV: ', time.time() - startTime, 'sec'
sys.stdout.flush()
你可以分享proc函數的代碼嗎? – barracel 2013-03-01 22:10:33
剛剛做到了。我沒有描述proc的論點 - 其中一些是矩陣,一些是列表清單,有些只是浮點/整數。 'queue'用於返回每個子進程的結果。 – Yevgeny 2013-03-01 22:40:05