0
有兩個數據集A和B(具有單柱 - ID)使用實施AB功能與LEFT級聯JOIN
Cat A
1
2
3
4
5
6
7
cat B
4
5
2
8
18
19
2197
Cat A-B
1
3
6
7
該減法以2進行步驟 步驟1:加入BY ID,LEFT JOIN乙BY ID) 這會得到具有2列,其中第1列將具有用於數據集A和第二列中的所有條目將只有從乙
匹配條目1
2 2
3
4 4
5 5
6
7
步驟2的數據集:篩選數據集從step1記錄第二個字段所在的記錄null 因此,我們通過使用LEFT JOIN實現了A-B。
我能夠執行步驟1,但我無法實施步驟2 下面是步驟1中
public class AMinusB {
public static FlowDef createWorkflowLeftJoin(Tap aTap, Tap bTap,
Tap outputTap) {
Pipe bpipe = new Pipe("b_pipe");
Pipe apipe = new Pipe("a_pipe");
Fields b_user_id = new Fields("B_id");
Fields a_user_id = new Fields("A_id");
Pipe joinPipe = new HashJoin(apipe, a_user_id, bpipe, b_user_id,
new LeftJoin());
Pipe retainPipe = new Pipe("retain", joinPipe);
retainPipe = new Retain(retainPipe, new Fields("A_id", "B_id"));
Pipe cdistPipe = new Pipe("UniquePipe", retainPipe);
Fields selector = new Fields("A_id", "B_id");
cdistPipe = new Unique(cdistPipe, selector);
FlowDef flowDef = FlowDef.flowDef().addSource(apipe, aTap)
.addSource(bpipe, bTap).addTailSink(cdistPipe, outputTap)
.setName("A-B using left outer join");
return flowDef;
}
public static void main(String[] args) {
String Apath = "path to data set A";
String Bpath = "path to data set B";
String outputPath = "path to output";
Properties properties = new Properties();
AppProps.setApplicationJarClass(properties,
LocationsNumForAProduct.class);
FlowConnector flowConnector = new Hadoop2MR1FlowConnector(properties);
Fields A = new Fields("A_id");
Tap ATap = new Hfs(new TextDelimited(A, false, "\t"), Apath);
Fields B = new Fields("B_id");
Tap BTap = new Hfs(new TextDelimited(B, false, "\t"), Bpath);
Tap outputTap = new Hfs(new TextDelimited(false, "\t"), outputPath);
FlowDef flowDefLeftJoin = createWorkflowLeftJoin(ATap, BTap, outputTap);
flowConnector.connect(flowDefLeftJoin).complete();
}
}