2012-04-01 63 views
19

我正在使用Python的multiprocessing模塊來並行處理大numpy數組。這些陣列在主進程中使用numpy.load(mmap_mode='r')進行內存映射。之後,multiprocessing.Pool()分叉進程(我想)。NumPy與多處理和mmap

一切似乎都做工精細,但我越來越線,如:

AttributeError的( 「 'NoneType' 對象有沒有屬性 '告訴'」)在<bound method memmap.__del__ of memmap([ 0.57735026, 0.57735026, 0.57735026, 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ], dtype=float32)>忽略

在單元測試日誌。儘管如此,測試通過很好。

任何想法發生了什麼?

使用Python 2.7.2,OS X,NumPy 1.6.1。


UPDATE:

一些調試之後,我追捕事業一個代碼路徑是使用(小片)的這個存儲器映射numpy的數組作爲輸入到Pool.imap呼叫。

顯然,「問題」是與multiprocessing.Pool.imap將輸入傳遞給新進程的方式:它使用pickle。這不適用於 ed numpy數組,以及內部中斷導致的錯誤。

我發現了Robert Kern的this reply,這似乎解決了同樣的問題。他建議爲imap輸入來自內存映射陣列時創建一個特殊的代碼路徑:在生成的進程中手動存儲器映射相同的陣列。

這將是如此複雜和醜陋,我寧願生活在錯誤和額外的內存拷貝。修改現有代碼有沒有其他方法可以輕鬆一些?

回答

22

我通常的做法是,在一個進程中完成所有的IO操作,然後將其發送到工作線程池中。要將一片memmapped數組加載到內存中,只需執行x = np.array(data[yourslice])data[yourslice].copy()實際上不會這樣做,這可能會導致一些混淆)。

首先,讓我們來生成一些測試數據:

import numpy as np 
np.random.random(10000).tofile('data.dat') 

你可以用這樣的事情重現你的錯誤:

import numpy as np 
import multiprocessing 

def main(): 
    data = np.memmap('data.dat', dtype=np.float, mode='r') 
    pool = multiprocessing.Pool() 
    results = pool.imap(calculation, chunks(data)) 
    results = np.fromiter(results, dtype=np.float) 

def chunks(data, chunksize=100): 
    """Overly-simple chunker...""" 
    intervals = range(0, data.size, chunksize) + [None] 
    for start, stop in zip(intervals[:-1], intervals[1:]): 
     yield data[start:stop] 

def calculation(chunk): 
    """Dummy calculation.""" 
    return chunk.mean() - chunk.std() 

if __name__ == '__main__': 
    main() 

如果你只是切換到產生np.array(data[start:stop])相反,你會解決問題:

import numpy as np 
import multiprocessing 

def main(): 
    data = np.memmap('data.dat', dtype=np.float, mode='r') 
    pool = multiprocessing.Pool() 
    results = pool.imap(calculation, chunks(data)) 
    results = np.fromiter(results, dtype=np.float) 

def chunks(data, chunksize=100): 
    """Overly-simple chunker...""" 
    intervals = range(0, data.size, chunksize) + [None] 
    for start, stop in zip(intervals[:-1], intervals[1:]): 
     yield np.array(data[start:stop]) 

def calculation(chunk): 
    """Dummy calculation.""" 
    return chunk.mean() - chunk.std() 

if __name__ == '__main__': 
    main() 

當然,這確實使額外內存中每個塊的副本。

從長遠來看,您可能會發現從memmapped文件切換到HDF等格式會更容易。如果你的數據是多維的,這尤其如此。 (我會推薦h5py,但如果你的數據是「表格式的」,pyTables是很好的。)

祝你好運,無論如何!

+0

喬你的答案總是搖滾。我一直在努力想出這樣的事情。 – YXD 2012-04-02 11:42:25

+0

感謝HDF小費。看起來像一個巨大的變化,但它可能是值得的,我會檢查出來。 – user124114 2012-04-02 12:55:47