2011-11-08 56 views
1

有帳戶ids,每個帳戶都有timestamp,分組爲username。這些用戶名組的foreach我希望所有(最舊的帳戶,其他帳戶)。豬拼圖:重寫一個簡單的豬腳本作爲一個簡單的減速器?

我有一個java reducer,這樣做,我可以重寫它爲一個簡單的豬腳本?

模式:

{group:(username),A: {(id , create_dt)}

輸入:

(batman,{(id1,100), (id2,200), (id3,50)}) 
(lulu ,{(id7,100), (id9,50)}) 

所需的輸出:

(batman,{(id3,id1), (id3,id2)}) 
(lulu ,{(id9,id7)}) 

回答

1

不是說有人好象並不在意,但在這裏不用。你必須創建一個UDF:

desired = foreach my_input generate group as n, FIND_PAIRS(A) as pairs_bag; 

而且UDF:

public class FindPairs extends EvalFunc<DataBag> { 
@Override 
    public DataBag exec(Tuple input) throws IOException { 
     Long pivotCreatedDate = Long.MAX_VALUE; 
     Long pivot = null; 

     DataBag accountsBag = (DataBag) input.get(0); 
     for (Tuple account : accountsBag){ 
      Long accountId = Long.parseLong(account.get(0).toString()); 
      Long creationDate = Long.parseLong(account.get(4).toString()); 
      if (creationDate < pivotCreatedDate) { 
       // pivot is the one with the minimal creation_dt 
       pivot = accountId; 
       pivotCreatedDate = creationDate; 
      } 
     } 

     DataBag allPairs = BagFactory.getInstance().newDefaultBag(); 
     if (pivot != null){ 
      for (Tuple account : accountsBag){ 
       Long accountId = Long.parseLong(account.get(0).toString()); 
       Long creationDate = Long.parseLong(account.get(4).toString()); 
       if (!accountId.equals(pivot)) { 
        // we don't want any self-pairs 
        Tuple output = TupleFactory.getInstance().newTuple(2); 
        if (pivot < accountId){ 
          output.set(0, pivot.toString()); 
          output.set(1, accountId.toString()); 
        } 
        else { 
        output.set(0, accountId.toString()); 
        output.set(1, pivot.toString()); 
        } 
       allPairs.add(output); 
      } 
     }    
     return allPairs; 
} 

,如果你想真正發揮很好,補充一點:

/** 
* Letting pig know that we emit a bag with tuples, each representing a pair of accounts 
*/ 
@Override 
public Schema outputSchema(Schema input) { 
    try{ 
     Schema pairSchema = new Schema(); 
     pairSchema.add(new FieldSchema(null, DataType.BYTEARRAY)); 
     pairSchema.add(new FieldSchema(null, DataType.BYTEARRAY)); 
     return new Schema(
       new FieldSchema(null, 
       new Schema(pairSchema), DataType.BAG));   
    }catch (Exception e){ 
      return null; 
    } 
} 

}