2017-06-01 75 views
0

我有這樣的數據。總計豬的每n個行值

1:23:0.20 
2:34:0.50 
3:67:0.90 
4:87:0.10 
5:23:0.12 

我想總結每2行最後一列值這樣。

0.20+0.50 = 0.70 
0.90+0.10 = 1.0 

,並打印這樣

1:23:0.20:0.70 
2:34:0.50:0.70 
3:67:0.90:1.0 
4:87:0.10:1.0 
5:23:0.12 

這是我的豬腳本

data = LOAD '/home/user/Documents/test/test.txt' using PigStorage(':') AS (tag:int,rssi:chararray,weightage:chararray,seqnum:int); 
B = FOREACH (GROUP data ALL) { 
A_ordered = ORDER data BY rssi; 
GENERATE FLATTEN(CUSTOM_UDF(A_ordered)); 
} 

我試圖用java UDF。但不能正常工作。

this is what I tried. 

public List<String> sumValues() { 
    List<String> processedList = new ArrayList<>(); 
    if (entries == null) { 
     return processedList; 
    } else { 
     double columnSum = 0; 
     List<String> tempList = new ArrayList<>(); 
     int length = entries.size(); 
     for (int index = 1; index <= length; index++) { 
      tempList.add(entries.get(index - 1)); 
      String[] splitValues = entries.get(index - 1).split(DELIMITER); 
      if (splitValues.length >= MIN_SPLIT_STRING_LENGTH) { 

       try { 
        double lastValue = Double.parseDouble(splitValues[WEIGHTAGE_INDEX]); 
        columnSum = columnSum + lastValue; 

        if ((index % ROWS_TO_BE_SUMMED == 0) || (index == length)) { 
         for (String tempString : tempList) { 
          processedList.add(tempString + ":" + columnSum); 
         } 
         tempList.clear(); // Clear the temporary array 
         columnSum = 0; 
        } 
       } catch (NumberFormatException e) { 
        System.out.println("Invalid weightage"); 
       } 
      } else { 
       System.out.println("Invalid input"); 
      } 
     } 
    } 
    return processedList; 
} 


@Override 
public String exec(Tuple input) throws IOException { 
    System.out.println("------INSIDE EXEC FUCTION ----" + input); 
    if (input != null && input.size() != 0) { 
     try { 
      String str = (String) input.get(0); 
      if (str != null) { 
       String splitStrings[] = str.split(":"); 
       if (splitStrings != null && splitStrings.length >= 3 && splitStrings[2].equals(EXIT)) { 
        List<String> processedList = sumValues(); 
        String sum = processedList.toString(); 
        System.out.println("SUM VALUE----:" + sum); 
        return sum; 
       } else { 
        System.out.println("INPUT VALUE----:" + str); 
        entries.add(str); 
        return null; 
       } 
      } 
     } catch (Exception e) { 
      return null; 
     } 
    } 
    return null; 
} 
} 

上面的代碼打印空結果。 任何幫助將不勝感激。

回答

0

在您的udf中,您收到tuple(int, chararray, chararray, int)並嘗試獲取第一個元素作爲String。當你用try...catch包圍代碼時,你看不到ClassCastException肯定發生在那裏。因爲你已經加載它拆分,所以你不需要拆分:

+0

不打印空結果 –

2

這可以在PIG本身完成。生成另一列表示基於數據集中偶數行的f11,並從中減去1以創建具有相同id的2行的集合。這將允許您將這兩個記錄分組到新列和總和最後一列。然後加入與關係的新集合,並獲得所需的列。

注意:對於n行sum,使用f1%n_value。

A = LOAD 'input.txt' USING PigStorage(':') AS (f1:int,f2:int,f3:double); 
B = FOREACH A GENERATE f1,(f1%2 == 0 ? (f1-1):f1) AS f11,f2,f3; 
C = GROUP B BY f11; 
D = FOREACH C GENERATE group AS f11,SUM(f3) AS Total; 
E = JOIN B BY f11,D BY f11; 
F = FOREACH E GENERATE B.f1,B.f2,B.f3,D.Total;-- Note:use B::f1,B::f2,B::f3,D::Total if '.' doesn't work. 

輸出

1,23,0.20 
2,34,0.50 
3,67,0.90 
4,87,0.10 
5,23,0.12 

- 添加基於偶數行數的新的第二列 - 1.

1,1,23,0.20 
2,1,34,0.50 
3,3,67,0.90 
4,3,87,0.10 
5,5,23,0.12 

Ç - 由新的第二列組

1,{(1,23,0.20),(2,34,0.50)} 
3,{(3,67,0.90),(4,87,0.10)} 
5,{(5,23,0.12)} 

d - 生成分組

1,0.70 
3,1.0 
5,0.12 

Ë後的總和 - 使用新的柱加入從與乙前面步驟的數據集

1,1,23,0.20,1,0.70 
2,1,34,0.50,1,0.70 
3,3,67,0.90,3,1.0 
4,3,87,0.10,3,1.0 
5,5,23,0.12,5,0.12 

E - 獲取所需的列。

1,23,0.20,0.70 
2,34,0.50,0.70 
3,67,0.90,1.0 
4,87,0.10,1.0 
5,23,0.12,0.12