2014-10-03 108 views
30

我已經使用rosetta.parallel.pandas_easy並行組後申請通過,例如:並行化應用大熊貓後GROUPBY

from rosetta.parallel.pandas_easy import groupby_to_series_to_frame 
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2']) 
groupby_to_series_to_frame(df, np.mean, n_jobs=8, use_apply=True, by=df.index) 

但是,有沒有人想出如何並行返回一個數據幀的功能?正如預期的那樣,此代碼不適用於rosetta。

def tmpFunc(df): 
    df['c'] = df.a + df.b 
    return df 

df.groupby(df.index).apply(tmpFunc) 
groupby_to_series_to_frame(df, tmpFunc, n_jobs=1, use_apply=True, by=df.index) 

回答

55

這似乎是工作,但它真的應該建在大熊貓

import pandas as pd 
from joblib import Parallel, delayed 
import multiprocessing 

def tmpFunc(df): 
    df['c'] = df.a + df.b 
    return df 

def applyParallel(dfGrouped, func): 
    retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped) 
    return pd.concat(retLst) 

if __name__ == '__main__': 
    df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2']) 
    print 'parallel version: ' 
    print applyParallel(df.groupby(df.index), tmpFunc) 

    print 'regular version: ' 
    print df.groupby(df.index).apply(tmpFunc) 

    print 'ideal version (does not work): ' 
    print df.groupby(df.index).applyParallel(tmpFunc) 
+0

你知道在將並行化併入熊貓方面是否有任何進展? – NumenorForLife 2015-05-14 19:53:55

+1

通過這樣做小的修改功能,可以做回分級指數,經常適用的回報: '高清temp_func(FUNC,名稱,組): 回報FUNC(組),名稱 高清applyParallel(dfGrouped ,func): 返回pd.concat(retLst,top_index = zip(* Parallel(n_jobs = multiprocessing.cpu_count())(延遲(temp_func)(func,name,group)名稱,dfGrouped中的組)) return pd.concat(retLst,我不知道如何在評論中發佈代碼... – BoZenKhaa 2015-12-10 17:12:57

+2

@ jsc123:有[dask](https://github.com/blaze/dask) – paulochf 2016-01-18 21:50:16

10

我有一個黑客,我用它來獲得熊貓的並行化。我將數據框分成塊,將每個塊放入列表的元素中,然後使用ipython的並行位對數據幀列表進行並行應用。然後我使用pandas concat函數將列表重新放在一起。

但是,這通常不適用。它適用於我,因爲我想應用於每個數據塊的功能需要大約一分鐘的時間。將數據拆分並放在一起並不需要很長時間。所以這顯然是一個混亂。這就是說,這裏是一個例子。我使用IPython的筆記本,所以你會在我的代碼中看到%%time魔術:

## make some example data 
import pandas as pd 

np.random.seed(1) 
n=10000 
df = pd.DataFrame({'mygroup' : np.random.randint(1000, size=n), 
        'data' : np.random.rand(n)}) 
grouped = df.groupby('mygroup') 

在這個例子中我將基於上述GROUPBY,使「塊」,但這並不一定是如何數據被分塊。雖然這是一個很常見的模式。

dflist = [] 
for name, group in grouped: 
    dflist.append(group) 

建立並行位

from IPython.parallel import Client 
rc = Client() 
lview = rc.load_balanced_view() 
lview.block = True 

寫一個愚蠢的功能應用到我們的數據

def myFunc(inDf): 
    inDf['newCol'] = inDf.data ** 10 
    return inDf 

現在讓我們來運行代碼的串行然後並行。 串行第一:

%%time 
serial_list = map(myFunc, dflist) 
CPU times: user 14 s, sys: 19.9 ms, total: 14 s 
Wall time: 14 s 

現在平行

%%time 
parallel_list = lview.map(myFunc, dflist) 

CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s 
Wall time: 1.56 s 

那麼只需要幾毫秒它們合併到一個數據幀

%%time 
combinedDf = pd.concat(parallel_list) 
CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms 
Wall time: 300 ms 

我上運行6個IPython的發動機我MacBook,但你可以看到它將執行時間從14秒降到2秒。

對於真正長時間運行的隨機模擬,我可以通過使用StarCluster啓動羣集來使用AWS後端。然而,大多數情況下,我在MBP上並行處理8個CPU。

+0

我會用我的代碼試試這個,謝謝。你能向我解釋爲什麼申請不會自動並行操作?看起來應用函數的整體好處是避免循環,但如果它沒有對這些組進行這樣的操作,那麼結果如何? – robertevansanders 2014-11-19 21:41:00

+1

由於GIL,並行化在Python中很難實現。請記住,應用通常是語法糖,並在其下面執行隱含循環。使用並行化有點棘手,因爲並行化會產生運行時間成本,這有時會消除並行化的好處。 – 2014-11-19 21:51:30

+0

「parallel_list」中有一個缺失的定義,因爲它在這一行提供了一個錯誤'name'parallel_list'未定義':'combinedDf = pd.concat(parallel_list)'? – Primer 2014-11-21 14:21:24

29

伊萬的答案是偉大的,但它看起來像它可以稍作簡化,也不再需要依賴於JOBLIB:

from multiprocessing import Pool, cpu_count 

def applyParallel(dfGrouped, func): 
    with Pool(cpu_count()) as p: 
     ret_list = p.map(func, [group for name, group in dfGrouped]) 
    return pandas.concat(ret_list) 

順便說一句:這不能代替任何 groupby.apply(),但它會覆蓋典型案例:如它應該涵蓋案例2和3 in the documentation,而您應該通過將參數axis=1給予最終的pandas.concat()調用來獲得案例1的行爲。

+0

當我使用REPL運行它時,出現錯誤'_pickle.PicklingError:無法在0x7ff841f48d90> pickle :__main__上的屬性查找tmpFunc失敗,但是如何使用REPL執行此操作? – Keiku 2018-01-31 08:25:05

+0

@Keiku沒有想法,我以前從來沒有聽說過REPL,但是你用''func = lambda x:x'來試試嗎?如果這也行不通,我建議你打開一個具體的問題。只能用''applyParallel([('one',1),('two',2)],your_func)'' – 2018-02-01 09:09:34

+0

複製,感謝您的建議,似乎我嘗試重新啓動控制檯並解決它。麻煩您了。 – Keiku 2018-02-01 09:14:26

0

一個簡短的評論陪伴JD龍的答案。我發現如果組的數量非常大(比如說成千上萬),並且你的apply函數正在做一些相當簡單和快速的事情,那麼把你的數據框分成塊,並把每個塊分配給一個工作者來執行groupby-apply(連續)可以比並行group by-apply快得多,並讓工作人員讀取包含多個組的隊列。例如:

import pandas as pd 
import numpy as np 
import time 
from concurrent.futures import ProcessPoolExecutor, as_completed 

nrows = 15000 
np.random.seed(1980) 
df = pd.DataFrame({'a': np.random.permutation(np.arange(nrows))}) 

因此,我們的數據框的樣子:

a 
0 3425 
1 1016 
2 8141 
3 9263 
4 8018 

注意,列 '' 具有多組(認爲客戶ID):

len(df.a.unique()) 
15000 

對操作功能我們組:

def f1(group): 
    time.sleep(0.0001) 
    return group 

開始ap OOL:

ppe = ProcessPoolExecutor(12) 
futures = [] 
results = [] 

執行並行GROUPBY申請:

%%time 

for name, group in df.groupby('a'): 
    p = ppe.submit(f1, group) 
    futures.append(p) 

for future in as_completed(futures): 
    r = future.result() 
    results.append(r) 

df_output = pd.concat(results) 
del ppe 

CPU times: user 18.8 s, sys: 2.15 s, total: 21 s 
Wall time: 17.9 s 

現在,讓我們添加一個分隔DF成少很多組列:

df['b'] = np.random.randint(0, 12, nrows) 

現在不是15000組有隻有12:

len(df.b.unique()) 
12 

我們將分配我們的df,並在每個塊上執行groupby-apply。

ppe = ProcessPoolExecutor(12) 

包裝的樂趣:

def f2(df): 
    df.groupby('a').apply(f1) 
    return df 

發送每個塊以串行操作上:

%%time 

for i in df.b.unique(): 
    p = ppe.submit(f2, df[df.b==i]) 
    futures.append(p) 

for future in as_completed(futures): 
    r = future.result() 
    results.append(r) 

df_output = pd.concat(results) 

CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s 
Wall time: 12.4 s 

注意,每組花費的時間量沒有變化。而改變的是工人從中讀取的隊列長度。我懷疑發生了什麼事情是工作人員不能同時訪問共享內存,並且不斷返回來讀取隊列,並因此踩到彼此的腳趾。隨着更大的塊運行,工作人員返回的頻率減少,所以這個問題得到改善,整體執行速度更快。