1
我想知道是否可以在Cascading中進行笛卡爾連接。 如果任何人都可以給一個簡單明瞭的例子來理解笛卡爾連接級聯?在級聯中實現笛卡爾連接
我想知道是否可以在Cascading中進行笛卡爾連接。 如果任何人都可以給一個簡單明瞭的例子來理解笛卡爾連接級聯?在級聯中實現笛卡爾連接
用做笛卡爾以下組件加入:
/**
* Created by dhruv.pancholi on 16/01/17.
*/
public class CartesianJoin extends SubAssembly {
public static class CommonFieldAddOperation extends BaseOperation implements Function, Serializable {
public CommonFieldAddOperation(Fields outputFields) {
super(outputFields);
}
@Override
public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
TupleEntry arguments = functionCall.getArguments();
// Copying the same tuple from input
Tuple tuple = new Tuple(arguments.getTuple());
// Adding 1 for joining on this field
tuple.add(1);
functionCall.getOutputCollector().add(tuple);
}
}
public CartesianJoin(Pipe leftPipe, Fields leftFields, Pipe rightPipe, Fields rightFields) {
// Adding 1 at the end of each tuple for joining
leftPipe = new Each(leftPipe, Fields.ALL, new CommonFieldAddOperation(Fields.merge(leftFields, new Fields("cartesian_common"))), Fields.RESULTS);
// Adding 1 at the end of each tuple for joining
rightPipe = new Each(rightPipe, Fields.ALL, new CommonFieldAddOperation(Fields.merge(rightFields, new Fields("cartesian_common_"))), Fields.RESULTS);
// Joining on the 1 which was added in both the pipes
Pipe joinPipe = new CoGroup(leftPipe, new Fields("cartesian_common"), rightPipe, new Fields("cartesian_common_"), new InnerJoin());
// Keeping only the original fields
joinPipe = new Retain(joinPipe, Fields.merge(leftFields, rightFields));
// Adding output pipe of the sub-assembly
setTails(joinPipe);
}
}
使用下面的代碼片段中的主要功能或任何流程的定義:
Pipe joinPipe = new CartesianJoin(leftPipe, new Fields("id", "name"), rightPipe, new Fields("id_", "name_"));
leftPipe
id name
1 dhruv
3 arun
righ TPIPE
id_ name_
1 dhruv
2 gaj
joinPipe
id name id_ name_
3 arun 2 gaj
3 arun 1 dhruv
1 dhruv 2 gaj
1 dhruv 1 dhruv
檢查這個帖子:http://stackoverflow.com/questions/14681506/cartesian-product-in-cascading – chinglun