2014-11-06 76 views
12

我正在嘗試使用pandas數據框的多處理功能,即將數據幀拆分爲8個部分。使用apply(每個部分在不同的進程中處理)應用一些函數。pandas multiprocessing apply

編輯: 這裏的解決方案,我終於發現:

import multiprocessing as mp 
import pandas.util.testing as pdt 

def process_apply(x): 
    # do some stuff to data here 

def process(df): 
    res = df.apply(process_apply, axis=1) 
    return res 

if __name__ == '__main__': 
    p = mp.Pool(processes=8) 
    split_dfs = np.array_split(big_df,8) 
    pool_results = p.map(aoi_proc, split_dfs) 
    p.close() 
    p.join() 

    # merging parts processed by different processes 
    parts = pd.concat(pool_results, axis=0) 

    # merging newly calculated parts to big_df 
    big_df = pd.concat([big_df, parts], axis=1) 

    # checking if the dfs were merged correctly 
    pdt.assert_series_equal(parts['id'], big_df['id']) 
+0

'res = df.apply(process apply,axis = 1)'中有一個空格,是嗎? – 2014-11-06 16:26:31

+1

@yemu你到底想通過這段代碼實現什麼? – Dalek 2014-11-06 16:37:14

+0

目前僅適用於飽和CPU的一個內核。我想使用多進程並使用所有內核來減少處理時間 – yemu 2014-11-06 19:29:09

回答

3

因爲我沒有太多數據的腳本,這是一個猜測,但我會用p.map建議,而不是apply_async與回電話。

p = mp.Pool(8) 
pool_results = p.map(process, np.array_split(big_df,8)) 
p.close() 
p.join() 
results = [] 
for result in pool_results: 
    results.extend(result) 
+0

@yemu爲你做了這個工作嗎? – 2014-11-06 21:38:01

+0

如果__name__ =='__main__',我必須把調用放在裏面。並與其他小的變化,我設法使其工作,但我不確定池結果中的結果數據框是否以與拆分相同的順序返回。我必須檢查它。 – yemu 2014-11-07 09:24:53

+0

在這裏看到的解決方案與'dask' https://stackoverflow.com/questions/37979167/how-to-parallelize-many-fuzzy-string-comparisons-using-apply-in-pandas – 2016-06-24 18:03:11

0

我也遇到同樣的問題,當我使用multiprocessing.map()應用功能,以不同的塊一個的大數據幀的。

我只是想補充幾點以防其他人遇到同樣的問題。

  1. 記得添加if __name__ == '__main__':
  2. 執行文件中的.py文件,如果使用ipython/jupyter notebook,那麼你就不能運行multiprocessing(這是我的情況也是一樣的,雖然我不知道)
相關問題