2011-09-05 34 views
11

我有一個256x256x256 Numpy數組,其中每個元素是一個矩陣。我需要對這些矩陣中的每一個做一些計算,並且我想使用multiprocessing模塊來加快速度。結合itertools和多處理?

這些計算的結果必須被存儲在一個陣列256x256x256像原來之一,使得在元件[i,j,k]原始陣列中的矩陣的結果必須在新陣列的[i,j,k]元件放。

要做到這一點,我想列出一個列表,可以僞造的方式編寫爲[array[i,j,k], (i, j, k)]並將它傳遞給一個函數以「多處理」。 假設matrices是從原來的陣列和myfunc提取的所有矩陣的名單正在做的計算功能,代碼看起來有點像這樣:

import multiprocessing 
import numpy as np 
from itertools import izip 

def myfunc(finput): 
    # Do some calculations... 
    ... 

    # ... and return the result and the index: 
    return (result, finput[1]) 

# Make indices: 
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3) 

# Make function input from the matrices and the indices: 
finput = izip(matrices, inds) 

pool = multiprocessing.Pool() 
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999)) 

然而,這似乎是map_async實際上是創建這個巨大的finput - 列表第一:我的CPU沒有太多的工作,但內存和交換在幾秒鐘內完全消耗,這顯然不是我想要的。

有沒有辦法將這個龐大的列表傳遞給一個多處理函數,而無需先顯式創建它? 或者你知道解決這個問題的另一種方法嗎?

非常感謝! :-)

+1

由於您在'map_async()'上使用'get()',因此您可能不希望執行* asynchronous *操作,而應該使用'Pool.map()'。 –

+0

也許我不明白這個問題,但你有沒有考慮過imap或imap_unordered? –

回答

10

全部multiprocessing.Pool.map*方法一旦函數被調用就完全消耗迭代器(demo code)。喂迭代一個組塊的映射函數的塊的時間,使用grouper_nofill

def grouper_nofill(n, iterable): 
    '''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']] 
    ''' 
    it=iter(iterable) 
    def take(): 
     while 1: yield list(itertools.islice(it,n)) 
    return iter(take().next,[]) 

chunksize=256 
async_results=[] 
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)): 
    async_results.extend(pool.map_async(myfunc, finput).get()) 
async_results=np.array(async_results) 

PS。 pool.map_asyncchunksize參數做了一些不同的操作:它將迭代器分解爲塊,然後將每個塊分配給一個稱爲map(func,chunk)的工作進程。如果func(item)完成得太快,這可以爲工作進程提供更多數據,但它不會對您的情況有所幫助,因爲在發出調用map_async之後,迭代器仍會立即充分消耗。

+0

非常感謝!你的解決方案似乎確實工作!作爲參考,我不得不使用pool.map_async(myfunc,finput).get(999999),但它的工作原理!但是,它仍然使用大量內存(當然取決於確切的chunksize),並且python在運行期間似乎不是垃圾收集。任何想法,爲什麼這可能是? – digitaldingo

+0

@digitaldingo:嗯,沒有任何想法。如果您可以將代碼縮減爲[SSCCE](http://sscce.org/)並將其發佈到此處,那將是理想之選。 – unutbu

0

Pool.map_async()需要知道可迭代的長度以將工作分派給多個工作者。由於izip沒有__len__,因此它會將迭代器首先轉換爲列表,導致您遇到的巨大內存使用量。

您可以嘗試通過使用__len__創建自己的izip樣式迭代器來避開此問題。

+0

它爲什麼需要知道?爲什麼它不能簡單地餵飽所有閒置的工人和等待? –

+0

@andrew - 'map_async()'('multiprocessing/pool.py')中的第一行實際上是'if not hasattr(iterable,'__len__'):iterable = list(iterable)'。它需要知道長度以創建足夠大的輸出列表,因爲工人的完成順序未知。 –

+0

嗯。它可以動態構建它,不是嗎?我只是想這可能會成爲一個問題。這似乎是一個有效的請求。 –

2

我也遇到了這個問題。而不是這樣的:

res = p.map(func, combinations(arr, select_n)) 

res = p.imap(func, combinations(arr, select_n)) 

IMAP不消耗它!