2012-05-24 66 views
4

我有一個傾斜的數據集,我需要通過操作做一個組,然後做一個嵌套的foreach。由於數據偏斜,很少有減速器需要很長時間,其他人則沒有時間。我知道存在偏斜的連接,但是對於羣組和對象來說有什麼用處?這裏是我的豬代碼(改名爲變量):如何在PIG拉丁語中通過語句優化組?

foo_grouped = GROUP foo_grouped by FOO; 
FOO_stats = FOREACH foo_grouped 
{ 
a_FOO_total = foo_grouped.ATTR; 
a_FOO_total = DISTINCT a_FOO_total; 

bar_count = foo_grouped.BAR; 
bar_count = DISTINCT bar_count; 

a_FOO_type1 = FILTER foo_grouped by COND1=='Y'; 
a_FOO_type1 = a_FOO_type1.ATTR; 
a_FOO_type1 = DISTINCT a_FOO_type1; 

a_FOO_type2 = FILTER foo_grouped by COND2=='Y' OR COND3=='HIGH'; 
a_FOO_type2 = a_FOO_type2.ATTR; 
a_FOO_type2 = DISTINCT a_FOO_type2; 

generate group as FOO, 
COUNT(a_FOO_total) as a_FOO_total, COUNT(a_FOO_type1) as a_FOO_type1, COUNT(a_FOO_type2)  as a_FOO_type2, COUNT(bar_count) as bar_count; } 

回答

9

在您的例子有很多FOREACH內嵌套DISTINCT運營商這是在減速中執行的,它依賴於RAM計算的獨特價值與該查詢產生只有一個工作。如果組中有太多獨特的元素,您也可以獲得與內存相關的異常。

幸運的是,PIG Latin是一種數據流語言,您可以編寫一些執行計劃。爲了利用更多的CPU,你可以改變你的代碼,這樣可以強制更多的MapReduce作業並行執行。爲此,我們應該重寫查詢而不使用嵌套的DISTINCT,訣竅是執行不同的操作並且比分組更簡單,就好像您只有一列並且合併結果一樣。它非常像SQL,但它起作用。那就是:

records = LOAD '....' USING PigStorage(',') AS (g, a, b, c, d, fd, s, w); 
selected = FOREACH records GENERATE g, a, b, c, d; 
grouped_a = FOREACH selected GENERATE g, a; 
grouped_a = DISTINCT grouped_a; 
grouped_a_count = GROUP grouped_a BY g; 
grouped_a_count = FOREACH grouped_a_count GENERATE FLATTEN(group) as g, COUNT(grouped_a) as a_count; 

grouped_b = FOREACH selected GENERATE g, b; 
grouped_b = DISTINCT grouped_b; 
grouped_b_count = GROUP grouped_b BY g; 
grouped_b_count = FOREACH grouped_b_count GENERATE FLATTEN(group) as g, COUNT(grouped_b) as b_count; 

grouped_c = FOREACH selected GENERATE g, c; 
grouped_c = DISTINCT grouped_c; 
grouped_c_count = GROUP grouped_c BY g; 
grouped_c_count = FOREACH grouped_c_count GENERATE FLATTEN(group) as g, COUNT(grouped_c) as c_count; 

grouped_d = FOREACH selected GENERATE g, d; 
grouped_d = DISTINCT grouped_d; 
grouped_d_count = GROUP grouped_d BY g; 
grouped_d_count = FOREACH grouped_d_count GENERATE FLATTEN(group) as g, COUNT(grouped_d) as d_count; 

mrg = JOIN grouped_a_count BY g, grouped_b_count BY g, grouped_c_count BY g, grouped_d_count BY g; 
out = FOREACH mrg GENERATE grouped_a_count::g, grouped_a_count::a_count, grouped_b_count::b_count, grouped_c_count::c_count, grouped_d_count::d_count; 
STORE out into '....' USING PigStorage(','); 

執行我得到了以下總結這表明,不同的行動並沒有從數據的時滯遭受了第一作業處理後:

Job Stats (time in seconds): 
     JobId   Maps Reduces MaxMapTime  MinMapTIme  AvgMapTime  MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs 
job_201206061712_0244 669  45  75  8  13  376  18  202  grouped_a,grouped_b,grouped_c,grouped_d,records,selected  DISTINCT,MULTI_QUERY 
job_201206061712_0245 1  1  3  3  3  12  12  12  grouped_c_count GROUP_BY,COMBINER 
job_201206061712_0246 1  1  3  3  3  12  12  12  grouped_b_count GROUP_BY,COMBINER 
job_201206061712_0247 5  1  48  27  33  30  30  30  grouped_a_count GROUP_BY,COMBINER 
job_201206061712_0248 1  1  3  3  3  12  12  12  grouped_d_count GROUP_BY,COMBINER 
job_201206061712_0249 4  1  3  3  3  12  12  12  mrg,out HASH_JOIN  ..., 
Input(s): 
Successfully read 52215768 records (44863559501 bytes) from: "...." 

Output(s): 
Successfully stored 9 records (181 bytes) in: "..." 

從工作DAG,我們可以看到該GROUPBY操作在並行執行:

Job DAG: 
job_201206061712_0244 ->  job_201206061712_0248,job_201206061712_0246,job_201206061712_0247,job_201206061712_0245, 
job_201206061712_0248 ->  job_201206061712_0249, 
job_201206061712_0246 ->  job_201206061712_0249, 
job_201206061712_0247 ->  job_201206061712_0249, 
job_201206061712_0245 ->  job_201206061712_0249, 
job_201206061712_0249 

它適用於我的數據集,其中所述組密鑰的值(在列克)中的一個,使數據的95%的細。它也擺脫了與內存相關的異常。

+0

多麼美妙的答案!你現在在nosql上正在做什麼項目? –

+0

非常好的答案!感謝分享。 –

0

我最近遇到了一個錯誤與此連接。如果在那裏的一組,那麼整個關係(S)將被丟棄在任何空..