2016-04-07 22 views
1

我想知道是否可以在Cascading中進行笛卡爾連接。 如果任何人都可以給一個簡單明瞭的例子來理解笛卡爾連接級聯?在級聯中實現笛卡爾連接

+0

檢查這個帖子:http://stackoverflow.com/questions/14681506/cartesian-product-in-cascading – chinglun

回答

0

用做笛卡爾以下組件加入:

/** 
* Created by dhruv.pancholi on 16/01/17. 
*/ 
public class CartesianJoin extends SubAssembly { 

    public static class CommonFieldAddOperation extends BaseOperation implements Function, Serializable { 

     public CommonFieldAddOperation(Fields outputFields) { 
      super(outputFields); 
     } 

     @Override 
     public void operate(FlowProcess flowProcess, FunctionCall functionCall) { 
      TupleEntry arguments = functionCall.getArguments(); 

      // Copying the same tuple from input 
      Tuple tuple = new Tuple(arguments.getTuple()); 

      // Adding 1 for joining on this field 
      tuple.add(1); 

      functionCall.getOutputCollector().add(tuple); 
     } 
    } 

    public CartesianJoin(Pipe leftPipe, Fields leftFields, Pipe rightPipe, Fields rightFields) { 

     // Adding 1 at the end of each tuple for joining 
     leftPipe = new Each(leftPipe, Fields.ALL, new CommonFieldAddOperation(Fields.merge(leftFields, new Fields("cartesian_common"))), Fields.RESULTS); 

     // Adding 1 at the end of each tuple for joining 
     rightPipe = new Each(rightPipe, Fields.ALL, new CommonFieldAddOperation(Fields.merge(rightFields, new Fields("cartesian_common_"))), Fields.RESULTS); 

     // Joining on the 1 which was added in both the pipes 
     Pipe joinPipe = new CoGroup(leftPipe, new Fields("cartesian_common"), rightPipe, new Fields("cartesian_common_"), new InnerJoin()); 

     // Keeping only the original fields 
     joinPipe = new Retain(joinPipe, Fields.merge(leftFields, rightFields)); 

     // Adding output pipe of the sub-assembly 
     setTails(joinPipe); 
    } 

} 

使用下面的代碼片段中的主要功能或任何流程的定義:

Pipe joinPipe = new CartesianJoin(leftPipe, new Fields("id", "name"), rightPipe, new Fields("id_", "name_")); 

leftPipe

id name 
1 dhruv 
3 arun 

righ TPIPE

id_ name_ 
1 dhruv 
2 gaj 

joinPipe

id name id_ name_ 
3 arun 2 gaj 
3 arun 1 dhruv 
1 dhruv 2 gaj 
1 dhruv 1 dhruv