2013-03-10 9 views
1

我是一名新的豬用戶。將換豬模式改爲所需類型

我有一個現有的模式,我想修改。我的源數據如下6列:

Name  Type Date  Region Op Value 
----------------------------------------------------- 
john  ab  20130106 D   X  20 
john  ab  20130106 D   C  19 
jphn  ab  20130106 D   T  8 
jphn  ab  20130106 E   C  854 
jphn  ab  20130106 E   T  67 
jphn  ab  20130106 E   X  98 

等等。每個Op的值始終爲C,TX

我基本上要以下面的方式來分割我的數據分成7列:

Name  Type Date  Region OpX OpC OpT 
---------------------------------------------------------- 
john  ab  20130106 D   20  19 8 
john  ab  20130106 E   98  854 67 

Op柱基本上分成3列:每一個Op值。這些列中的每一列都應包含來自列Value的適當值。

我該如何在豬身上做到這一點?以達到預期的效果

回答

1

方式一:

IN = load 'data.txt' using PigStorage(',') as (name:chararray, type:chararray, 
     date:int, region:chararray, op:chararray, value:int); 
A = order IN by op asc; 
B = group A by (name, type, date, region); 
C = foreach B { 
    bs = STRSPLIT(BagToString(A.value, ','),',',3); 
    generate flatten(group) as (name, type, date, region), 
    bs.$2 as OpX:chararray, bs.$0 as OpC:chararray, bs.$1 as OpT:chararray; 
} 

describe C; 
C: {name: chararray,type: chararray,date: int,region: chararray,OpX: 
    chararray,OpC: chararray,OpT: chararray} 

dump C; 
(john,ab,20130106,D,20,19,8) 
(john,ab,20130106,E,98,854,67) 

更新:

如果你想跳過order by它增加了一個額外的減少階段的計算,你可以用前綴的每一個值的元組對應的操作碼v。然後用custom UDF具有所需的OPX,OPC,OPT順序元組字段進行排序:

register 'myjar.jar'; 
A = load 'data.txt' using PigStorage(',') as (name:chararray, type:chararray, 
     date:int, region:chararray, op:chararray, value:int); 
B = group A by (name, type, date, region); 
C = foreach B { 
    v = foreach A generate CONCAT(op, (chararray)value); 
    bs = STRSPLIT(BagToString(v, ','),',',3); 
    generate flatten(group) as (name, type, date, region), 
    flatten(TupleArrange(bs)) as (OpX:chararray, OpC:chararray, OpT:chararray); 
} 

其中TupleArrange在mjar.jar是這樣的:

.. 
import org.apache.pig.EvalFunc; 
import org.apache.pig.data.Tuple; 
import org.apache.pig.data.TupleFactory; 
import org.apache.pig.impl.logicalLayer.schema.Schema; 

public class TupleArrange extends EvalFunc<Tuple> { 

    private static final TupleFactory tupleFactory = TupleFactory.getInstance(); 

    @Override 
    public Tuple exec(Tuple input) throws IOException { 
     try { 
      Tuple result = tupleFactory.newTuple(3); 
      Tuple inputTuple = (Tuple) input.get(0); 
      String[] tupleArr = new String[] { 
        (String) inputTuple.get(0), 
        (String) inputTuple.get(1), 
        (String) inputTuple.get(2) 
      }; 
      Arrays.sort(tupleArr); //ascending 
      result.set(0, tupleArr[2].substring(1)); 
      result.set(1, tupleArr[0].substring(1)); 
      result.set(2, tupleArr[1].substring(1)); 
      return result; 
     } 
     catch (Exception e) { 
      throw new RuntimeException("TupleArrange error", e); 
     } 
    } 

    @Override 
    public Schema outputSchema(Schema input) { 
     return input; 
    } 
} 
+0

感謝那些工作。 – JohnMeek 2013-03-12 02:12:18

相關問題