我想了解multiprocessing.Pool是如何工作的,並且我開發了一個說明我的問題的最小例子。簡而言之,我使用pool.map通過以下示例Dead simple example of using Multiprocessing Queue, Pool and Locking並行化在陣列上運行的CPU綁定函數。當我遵循這種模式時,我只獲得了4核心的適度加速,但是如果我將數組手動分塊爲num_threads,然後在分塊上使用pool.map,我發現加速因子大大超過了4x,這使得無感覺到我。要遵循的細節。使用python多重處理優化一個簡單的CPU綁定函數
一,函數定義。
def take_up_time():
n = 1e3
while n > 0:
n -= 1
def count_even_numbers(x):
take_up_time()
return np.where(np.mod(x, 2) == 0, 1, 0)
現在定義我們將進行基準測試的函數。
首先,在連續運行的功能:
def serial(arr):
return np.sum(map(count_even_numbers,arr))
現在使用Pool.map在「標準」的方式功能:
def parallelization_strategy1(arr):
num_threads = multiprocessing_count()
pool = multiprocessing.Pool(num_threads)
result = pool.map(count_even_numbers,arr)
pool.close()
return np.sum(result)
最後,第二個策略中,我手動塊陣列,然後運行Pool.map在組塊(分裂溶液由於python numpy split array into unequal subarrays)
def split_padded(a,n):
""" Simple helper function for strategy 2
"""
padding = (-len(a))%n
if padding == 0:
return np.split(a, n)
else:
sub_arrays = np.split(np.concatenate((a,np.zeros(padding))),n)
sub_arrays[-1] = sub_arrays[-1][:-padding]
return sub_arrays
def parallelization_strategy2(arr):
num_threads = multiprocessing_count()
sub_arrays = split_padded(arr, num_threads)
pool = multiprocessing.Pool(num_threads)
result = pool.map(count_even_numbers,sub_arrays)
pool.close()
return np.sum(np.array(result))
這裏是我的陣列輸入:
npts = 1e3
arr = np.arange(npts)
現在我用的IPython%timeit函數運行我的計時,併爲1E3點我得到以下幾點:
- 串行:10個循環,最好的3:98.7毫秒每循環
- parallelization_strategy1:10個循環,最好的3:每次循環77.7毫秒
- parallelizat ion_strategy2:10個循環,最好的3:22毫秒每循環
由於我有4個內核,策略1是令人失望的適度的加速,和策略2是可疑大於最大4倍加速比更大。
當我增加NPTS到1E4,結果是更加令人困惑:
- 串行:1個循環,最好的3:每圈967毫秒
- parallelization_strategy1:1個循環,最好的每循環3:596毫秒
- parallelization_strategy2:10循環,最好是3:22。每個環路9毫秒
所以混亂的兩個來源是:
- 策略2比幼稚理論極限
- 出於某種原因方式更快,%timeit與NPTS = 1E4僅觸發1循環爲串行和策略1,但10策略2循環。
你比較了不同策略的結果嗎? – MSeifert