2013-02-27 20 views
8

在使用多處理程序包(Amazon EC2上的Ubuntu 12.04上的numpy 1.7.0使用python 2.73)並行執行一些簡單的基於numpy的矩陣代數計算時,出現系統錯誤(如下所示) 。我的代碼對於較小的矩陣大小工作正常,但對較大的內存崩潰(具有足夠的可用內存)使用多重處理運行子進程時出現系統錯誤

我使用的矩陣大小很大(我的代碼對1000000x10浮點密集矩陣運行正常,但崩潰了1000000x500個 - 我順便通過這些矩陣到/從子進程)。 10 vs 500是一個運行時參數,其他所有內容都保持不變(輸入數據,其他運行時參數等)

我也嘗試使用python3運行相同(移植)的代碼 - 對於較大的矩陣子進程進入睡眠/空閒模式(而不是像Python 2.7中的崩潰),程序/子進程只是掛在那裏無所事事。對於較小的矩陣,代碼可以在python3下正常運行。

任何建議,將不勝感激(我運行的想法在這裏)

錯誤消息:

Exception in thread Thread-5: Traceback (most recent call last): 
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner 
    self.run() File "/usr/lib/python2.7/threading.py", line 504, in run 
    self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks 
    put(task) SystemError: NULL result without error in PyObject_Call 

的多重代碼我使用:

def runProcessesInParallelAndReturn(proc, listOfInputs, nParallelProcesses): 
    if len(listOfInputs) == 0: 
     return 
    # Add result queue to the list of argument tuples. 
    resultQueue = mp.Manager().Queue() 
    listOfInputsNew = [(argumentTuple, resultQueue) for argumentTuple in listOfInputs] 
    # Create and initialize the pool of workers. 
    pool = mp.Pool(processes = nParallelProcesses) 
    pool.map(proc, listOfInputsNew) 
    # Run the processes. 
    pool.close() 
    pool.join() 
    # Return the results. 
    return [resultQueue.get() for i in range(len(listOfInputs))] 

下面是「 proc「執行每個子進程。基本上,它使用numpy解決了許多線性方程組的系統(它在子過程內構造了所需的矩陣)並將結果作爲另一個矩陣返回。再一次,它適用於一個運行時參數的較小值,但對於較大的參數會崩潰(或掛在python3中)。

def solveForLFV(param): 
    startTime = time.time() 
    (chunkI, LFVin, XY, sumLFVinOuterProductLFVallPlusPenaltyTerm, indexByIndexPurch, outerProductChunkSize, confWeight), queue = param 
    LFoutChunkSize = XY.shape[0] 
    nLFdim = LFVin.shape[1] 
    sumLFVinOuterProductLFVpurch = np.zeros((nLFdim, nLFdim)) 
    LFVoutChunk = np.zeros((LFoutChunkSize, nLFdim)) 
    for LFVoutIndex in xrange(LFoutChunkSize): 
     LFVInIndexListPurch = indexByIndexPurch[LFVoutIndex] 
     sumLFVinOuterProductLFVpurch[:, :] = 0. 
     LFVInIndexChunkLow, LFVInIndexChunkHigh = getChunkBoundaries(len(LFVInIndexListPurch), outerProductChunkSize) 
     for LFVInIndexChunkI in xrange(len(LFVInIndexChunkLow)): 
      LFVinSlice = LFVin[LFVInIndexListPurch[LFVInIndexChunkLow[LFVInIndexChunkI] : LFVInIndexChunkHigh[LFVInIndexChunkI]], :] 
      sumLFVinOuterProductLFVpurch += sum(LFVinSlice[:, :, np.newaxis] * LFVinSlice[:, np.newaxis, :]) 
     LFVoutChunk[LFVoutIndex, :] = np.linalg.solve(confWeight * sumLFVinOuterProductLFVpurch + sumLFVinOuterProductLFVallPlusPenaltyTerm, XY[LFVoutIndex, :]) 
    queue.put((chunkI, LFVoutChunk)) 
    print 'solveForLFV: ', time.time() - startTime, 'sec' 
    sys.stdout.flush() 
+0

你可以分享pr​​oc函數的代碼嗎? – barracel 2013-03-01 22:10:33

+0

剛剛做到了。我沒有描述proc的論點 - 其中一些是矩陣,一些是列表清單,有些只是浮點/整數。 'queue'用於返回每個子進程的結果。 – Yevgeny 2013-03-01 22:40:05

回答

5

500,000,000非常大:如果你使用的是float64,那是40億字節,或者大約4GB。 (10,000,000浮點數組將會是8000萬字節,或者說大約80M字節 - 要小得多)。我期望這個問題與多處理試圖挑選數組發送到管道上的子進程有關。

由於您位於unix平臺上,因此可以通過利用fork()(用於創建多處理的工作人員)的內存繼承行爲來避免此行爲。我的這個破解成功(從this project中刪除),在評論中描述。

### A helper for letting the forked processes use data without pickling. 
_data_name_cands = (
    '_data_' + ''.join(random.sample(string.ascii_lowercase, 10)) 
    for _ in itertools.count()) 
class ForkedData(object): 
    ''' 
    Class used to pass data to child processes in multiprocessing without 
    really pickling/unpickling it. Only works on POSIX. 

    Intended use: 
     - The master process makes the data somehow, and does e.g. 
      data = ForkedData(the_value) 
     - The master makes sure to keep a reference to the ForkedData object 
      until the children are all done with it, since the global reference 
      is deleted to avoid memory leaks when the ForkedData object dies. 
     - Master process constructs a multiprocessing.Pool *after* 
      the ForkedData construction, so that the forked processes 
      inherit the new global. 
     - Master calls e.g. pool.map with data as an argument. 
     - Child gets the real value through data.value, and uses it read-only. 
    ''' 
    # TODO: does data really need to be used read-only? don't think so... 
    # TODO: more flexible garbage collection options 
    def __init__(self, val): 
        g = globals() 
        self.name = next(n for n in _data_name_cands if n not in g) 
        g[self.name] = val 
        self.master_pid = os.getpid() 

    @property 
    def value(self): 
        return globals()[self.name] 

    def __del__(self): 
        if os.getpid() == self.master_pid: 
            del globals()[self.name] 
相關問題