2015-04-03 44 views
0

這裏是我使用的代碼:爲什麼多處理池的這個實現不起作用?

import pandas as pd 
import sys, multiprocessing 

train_data_file = '/home/simon/ali_bigdata/train_data_user_2.0.csv' 
user_list_file = '/home/simon/ali_bigdata/user_list.txt' 



def feature_extract(list_file, feature_extract_func): 
    tmp_list = [line.strip() for line in open(list_file)] 

    pool = multiprocessing.Pool(multiprocessing.cpu_count()) 
    results_list = pool.map(feature_extract_func, tmp_list) 

    for tmp in results_list: 
     for i in tmp: 
      print i,"\t", 
     print "\n" 

    pool.close() 
    pool.join() 

def user_feature(tmp_user_id): 
    sys.stderr.write("process user " + tmp_user_id + " ...\n") 
    try: 
     tmp_user_df = df_user.loc[int(tmp_user_id)] 
    except KeyError: 
     return [tmp_user_id, 0, 0, 0.0] 
    else: 
     if type(tmp_user_df) == pd.core.series.Series: 
      tmp_user_click = 1 
     else: 
      (tmp_user_click, suck) = tmp_user_df.shape 

     tmp_user_buy_df = tmp_user_df.loc[tmp_user_df['behavior_type'] == 4] 
     if type(tmp_user_buy_df) == pd.core.frame.DataFrame: 
      tmp_user_buy = 1 
     else: 
      (tmp_user_buy, suck) = tmp_user_buy_df.shape 


     return [tmp_user_id, tmp_user_click, tmp_user_buy, 0.0 if tmp_user_click == 0 else float(tmp_user_buy)/tmp_user_click] 


df = pd.read_csv(train_data_file, header=0) 
df_user = df.set_index(['user_id']) 
feature_extract(user_list_file, user_feature) 

我得到的錯誤是:

process user 102761946 ... 
process user 110858443 ... 
process user 131681429 ... 
Traceback (most recent call last): 
    File "extract_feature_2.0.py", line 53, in <module> 
    feature_extract(user_list_file, user_feature) 
    File "extract_feature_2.0.py", line 13, in feature_extract 
    results_list = pool.map(feature_extract_func, tmp_list) 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 251, in map 
    return self.map_async(func, iterable, chunksize).get() 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get 
    raise self._value 
KeyError: 'the label [False] is not in the [index]' 

當程序運行一段時間它發生。

那麼這個錯誤是什麼意思,我該如何多處理這個映射函數呢?

這裏的輸入數據格式

user_id,item_id,behavior_type,user_geohash,item_category,date,time 
99512554,37320317,3,94gn6nd,9232,2014-11-26,20 
9909811,266982489,1,,3475,2014-12-02,23 
98692568,27121464,1,94h63np,5201,2014-11-19,13 

回答

0

很難在裏面多用的功能調試錯誤。您應該關閉多處理器進行調試,然後在修復時重新打開它。我通常在我的函數中有一個mp=True參數,默認情況下它在多處理模式下運行該函數,但可以設置爲False以使用常規的非多處理運行map(使用if測試),以便我可以調試這些類型的錯誤。

所以,你可以設置你的功能就是這樣,並與mp=False參數運行進行調試:

def feature_extract(list_file, feature_extract_func, mp=True): 
    tmp_list = [line.strip() for line in open(list_file)] 

    if mp: 
     pool = multiprocessing.Pool(multiprocessing.cpu_count()) 
     results_list = pool.map(feature_extract_func, tmp_list) 
    else: 
     results_list = map(feature_extract_func, tmp_list) 

    for tmp in results_list: 
     for i in tmp: 
      print i,"\t", 
     print "\n" 

    if mp: 
     pool.close() 
     pool.join() 

此外,Pool自動默認使用可用CPU的數量,這樣就不會需要設置進程的數量,除非你想要與之不同的東西。

此外,在這種情況下,使用生成器表達式而不是列表理解更有效率(儘管您可以更輕鬆地切分列表理解,因此對於調試,您可能希望使用列表理解跳轉到前面對導致問題的指標):

所以,一旦調試完畢,更換:

tmp_list = [line.strip() for line in open(list_file)] 

有:

tmp_list = (line.strip() for line in open(list_file)) 
+0

在調試方法的幫助下,我終於修好了!感謝:) – 2015-04-07 07:04:37

+0

@simon_xia:到底是什麼問題? – mhawke 2015-04-07 07:37:58

+0

@mhawke它是由'df_user.loc [int(tmp_user_id)]'的返回值引起的,當只有一行滿足條件時,它可能是一個系列。所以'tmp_user_buy_df = tmp_user_df.loc [tmp_user_df ['behavior_type'] == 4]'這個語句將細分 – 2015-04-11 01:11:35

0

你的避風港沒有顯示出現錯誤時正在播放的任何數據。請在您的問題中發佈能夠觸發問題的代表性數據 - 如果您的問題可以複製,那麼幫助您會容易得多。

我認爲錯誤在該行發生的事情:

tmp_user_buy_df = tmp_user_df.loc[tmp_user_df['behavior_type'] == 4] 

tmp_user_df['behavior_type'] == 4返回一個布爾值 - 真或假 - 然後將其用作標籤。因爲標籤False不是數據幀中的標籤/系列KeyError: 'the label [False] is not in the [index]'被引發。我很疑惑爲什麼True案件顯然有效,但是我們還沒有看到您的數據,所以可能會有解釋。

你可能打算傳遞一個布爾數組作爲選擇器;如果是這樣,將行爲類型查找包裝在列表中,例如,G:

tmp_user_buy_df = tmp_user_df.loc[[tmp_user_df['behavior_type'] == 4]] 

另外,isinstance()優於type(x) == X,見this comprehensive explanation,可以將線

if type(tmp_user_df) == pd.core.series.Series: 

更改爲

if isinstance(tmp_user_df, pd.core.series.Series): 

if type(tmp_user_buy_df) == pd.core.frame.DataFrame: 

if isinstance(tmp_user_buy_df, pd.core.frame.DataFrame): 
+0

抱歉有點晚了,我剛剛粘貼了數據格式。我想要做的是選擇相同用戶的behavior_type 4 – 2015-04-07 01:18:29

+0

感謝您使用'isinstance()'的建議,在其他情況下可以正常工作,但在這種情況下無法工作,不知怎的,這很奇怪 – 2015-04-11 01:22:01

+0

'isinstance ()'的例子相當於使用'type'作爲所使用的對象。 – mhawke 2015-04-11 02:08:53