2014-02-20 22 views
0

裏面我有元組的包裝袋,並且需要被歸到零每袋一個字段。我把這個字段的MIN放在包裏,並從每個元組中減去那個min。數學袋

可以這樣做而不扁平化

實際情況稍微複雜一點,因爲我只希望min的一部分元組滿足一定的條件。

下面是一些示例代碼不工作:

data = LOAD 'data.csv' USING PigStorage(',') 
    AS (x:int, y:int, z:int); 

data_grouped = GROUP data BY x; 

data_normal = FOREACH data_grouped { 
    good_data = FILTER data BY y == 0; 
    smallest_good_z = MIN(good_data.z); 
    GENERATE data.(x, y, z-smallest_good_z); 
} 

DESCRIBE data_normal; 

rmf data_normal 
STORE data_normal INTO 'data_normal' USING PigStorage(','); 

和樣品data.csv

0,0,1 
0,0,2 
0,0,3 
0,1,0 
0,2,-1 
1,2,3 
1,3,4 
1,4,5 
1,0,5 

請告訴我,我不必組,MIN,壓平,減,並重新組合!這裏是我現在使用的方法,我想要擺脫:

data = LOAD 'data.csv' USING PigStorage(',') AS 
    (x:int, y:int, z:int); 

data_grouped = GROUP data BY x; 

data_n0 = FOREACH data_grouped { 
    good_data = FILTER data BY y == 0; 
    smallest_good_z = MIN(good_data.z); 
    GENERATE FLATTEN(data.(x, y, z)), smallest_good_z AS smz:int; 
} 

data_n1 = FOREACH data_n0 GENERATE x,y,z-smz; 

data_normal = GROUP data_n1 BY x; 
+0

哦有一隻貓雙關的問題的標題某處潛伏...:d – TC1

回答

1

不幸的是,你只能用UDF做到這一點。這裏有一個例子:

import java.io.IOException; 
import org.apache.pig.EvalFunc; 
import org.apache.pig.data.DataBag; 
import org.apache.pig.data.Tuple; 

public class MinusToAllInBag extends EvalFunc<Tuple> { 

    @Override 
    public Tuple exec(Tuple input) throws IOException { 
     if (input == null || input.size() != 3) { 
      System.err.println("Inputs are ({inputBag}, position, toSubtract)"); 
      return null; 
     } 
     try { 
      Object o = input.get(0); 
      if (!(o instanceof DataBag)) { 
       throw new RuntimeException("parameter 1 must be a databag"); 
      } 
      DataBag inputBag = (DataBag)o; 
      Integer pos = (Integer) input.get(1); 
      Float toSubtract = (Float) input.get(2); 
      for (Tuple row : inputBag) { 
       Float value = (Float)row.get(pos); 
       if (value != null) { 
        value -= toSubtract; 
        row.set(pos, value); 
       } 
      } 
      return input; 
     } catch (Exception e) { 
      System.err.println("Failed to process input; error - " + e.getMessage()); 
      return null; 
     } 
    } 
} 

和豬腳本:

REGISTER libs.jar; 

data = LOAD 'data.csv' USING PigStorage(',') AS 
    (x:int, y:int, z:float); 

data_grouped = GROUP data BY x; 

data_n0 = FOREACH data_grouped { 
    good_data = FILTER data BY y == 0; 
    smallest_good_z = MIN(good_data.z); 
    GENERATE group, MinusToAllInBag(data, 2, (float)smallest_good_z); 
} 

dump data_n0; 
+0

可以這樣UDF方法可以擴展到工作在一堆你不知道元組大小的元組?換句話說,總是從任何元組的第n個位置減去一個值,不管它是否有n個字段或n + 1000個字段?在我的情況下,我真的無法保留每次向我的元組添加字段時都需要編輯的UDF。 –

+0

是的,它是UDF的第二個參數,第一個參數是BAG,BAG元組中的pos以0開頭,數字減去。此時所有數字都必須是浮動的。根據「MinusToAllInBag(data,2,(float)smallest_good_z)」中的示例索引2,如果您從1開始計數,則爲第3列。 – alexeipab