2017-06-22 80 views
1

我有兩組,其中一組的行要作爲組處理,另一組有待查看。多處理組應用python

test = pd.DataFrame({'Address1':['123 Cheese Way','234 Cookie Place','345 Pizza Drive','456 Pretzel Junction'],'city':['X','U','X','U']}) 
test2 = pd.DataFrame({'Address1':['123 chese wy','234 kookie Pl','345 Pizzza DR','456 Pretzel Junktion'],'city':['X','U','Z','Y'] , 'ID' : ['1','3','4','8']}) 

gr1 = test.groupby('city') 
gr2 = test2.groupby('city') 

目前我申請我的功能組中的每一行,

gr1.apply(lambda x: custom_func(x.Address1, gr2.get_group(x.name))) 

但是我不知道該怎麼做多處理這一點。請指教。

編輯: - 我試圖使用dask,但我不能將整個數據幀傳遞給dask函數 - 因爲它的apply函數有一個限制。我嘗試在我的gr1(group)上使用dask apply,但由於我在自定義函數中設置索引,dask會拋出一個錯誤,指出「索引器太多」。

這裏有DASK,這給了我一個錯誤 - 「Pandas' object has no attribute 'city'

ddf1 = dd.from_pandas(test, 2) 
ddf2 = dd.from_pandas(test2, 2) 

dgr1 = ddf1.groupby('city') 
dgr2 = ddf2.groupby('city') 

meta = pd.DataFrame(columns=['Address1', 'score', 'idx','source_index']) 
ddf1.map_partitions(custom_func, x.Address1, dgr2.get_group(x.city).Address1,meta=meta).compute() 
+0

看看'dask',它與熊貓很好地結合在一起。 – suvy

+0

是的,但是dask不支持通過應用函數傳遞數據幀。第二件事,當我試圖在組上應用dask時,它會因爲「我試圖在我的custom_func中設置索引」中的「索引太多」而失敗。 –

+0

dask apply應該明智地工作,以便明智地使用map_partition。可能是很酷的你編輯你的問題與你嘗試和錯誤報告。 – suvy

回答

2

我提供了另一種解決方案,在這裏使用DASK,

import pandas as pd 
from multiprocessing import Pool 
test = pd.DataFrame({'Address1':['123 Cheese Way','234 Cookie Place','345 Pizza Drive','456 Pretzel Junction'],'city':['X','U','X','U']}) 
test2 = pd.DataFrame({'Address1':['123 chese wy','234 kookie Pl','345 Pizzza DR','456 Pretzel Junktion'],'city':['X','U','Z','Y'] , 'ID' : ['1','3','4','8']}) 

test=test.assign(dataset = 'test') 
test2=test2.assign(dataset = 'test2') 

newdf=pd.concat([test2,test],keys = ['test2','test']) 
gpd=newdf.groupby('city') 
def my_func(mygrp): 
    test_data=mygrp.loc['test'] 
    test2_data=mygrp.loc['test2'] 
    #do something specific 
    #if needed print something 
    return {'Address':test2_data.Address1.values[0],'ID':test2_data.ID.values[0]} #return some other stuff 

mypool=Pool(processes=2) 
ret_list=mypool.imap(my_func,(group for name, group in gpd)) 

pd.DataFrame(ret_list) 

回報像

ID address 
0 3 234 kookie Pl 
1 1 123 chese wy 
2 8 456 Pretzel Junktion 
3 4 345 Pizzza DR 

PS:在OP的問題中,兩個相似的數據集在一個專門的函數中進行比較,這裏的解決方案是u ses pandas.concat。根據問題,還可以想象一個pd.merge

+0

嘿。謝謝你的幫助。不斷收到此內存錯誤。 :('Traceback(最近一次調用最後一個): 文件「script.py」,第98行,在 main() 主文件「script.py」,行87, ret_list = mypool.map(my_func, (group for name,group in gpd)) 文件「/home/ubuntu/anaconda2/lib/python2.7/multiprocessing/pool.py」,第251行,在地圖 return self.map_async(func,iterable,chunksize) .get() 文件「/home/ubuntu/anaconda2/lib/python2.7/multiprocessing/pool.py」,第567行,在得到 raise self._value MemoryError' –

+0

儘管我有32GB的RAM和256GB的磁盤我認爲,pool.map將整個數據複製到每個導致內存問題的子進程中,我不需要父進程apar中的任何數據從我通過的東西。我如何實現這一目標? –

+0

Yaa地圖會製作一個密集的迭代器列表,所以請改用mypool.imap。 – suvy