2016-06-22 18 views
0

有兩個數據集A和B(具有單柱 - ID)使用實施AB功能與LEFT級聯JOIN

Cat A 

    1 
    2 
    3 
    4 
    5 
    6 
    7 

cat B 
4 
5 
2 
8 
18 
19 
2197 

Cat A-B 
1 
3 
6 
7 

該減法以2進行步驟 步驟1:加入BY ID,LEFT JOIN乙BY ID) 這會得到具有2列,其中第1列將具有用於數據集A和第二列中的所有條目將只有從乙

匹配條目
1 
2 2 
3 
4 4 
5 5 
6 
7 

步驟2的數據集:篩選數據集從step1記錄第二個字段所在的記錄null 因此,我們通過使用LEFT JOIN實現了A-B。

我能夠執行步驟1,但我無法實施步驟2 下面是步驟1中

public class AMinusB { 

public static FlowDef createWorkflowLeftJoin(Tap aTap, Tap bTap, 
     Tap outputTap) { 
    Pipe bpipe = new Pipe("b_pipe"); 
    Pipe apipe = new Pipe("a_pipe"); 
    Fields b_user_id = new Fields("B_id"); 
    Fields a_user_id = new Fields("A_id"); 

    Pipe joinPipe = new HashJoin(apipe, a_user_id, bpipe, b_user_id, 
      new LeftJoin()); 
    Pipe retainPipe = new Pipe("retain", joinPipe); 
    retainPipe = new Retain(retainPipe, new Fields("A_id", "B_id")); 

    Pipe cdistPipe = new Pipe("UniquePipe", retainPipe); 

    Fields selector = new Fields("A_id", "B_id"); 

    cdistPipe = new Unique(cdistPipe, selector); 

    FlowDef flowDef = FlowDef.flowDef().addSource(apipe, aTap) 
      .addSource(bpipe, bTap).addTailSink(cdistPipe, outputTap) 
      .setName("A-B using left outer join"); 
    return flowDef; 
} 

public static void main(String[] args) { 
    String Apath = "path to data set A"; 
    String Bpath = "path to data set B"; 
    String outputPath = "path to output"; 
    Properties properties = new Properties(); 
    AppProps.setApplicationJarClass(properties, 
      LocationsNumForAProduct.class); 
    FlowConnector flowConnector = new Hadoop2MR1FlowConnector(properties); 

    Fields A = new Fields("A_id"); 
    Tap ATap = new Hfs(new TextDelimited(A, false, "\t"), Apath); 

    Fields B = new Fields("B_id"); 
    Tap BTap = new Hfs(new TextDelimited(B, false, "\t"), Bpath); 

    Tap outputTap = new Hfs(new TextDelimited(false, "\t"), outputPath); 

    FlowDef flowDefLeftJoin = createWorkflowLeftJoin(ATap, BTap, outputTap); 
    flowConnector.connect(flowDefLeftJoin).complete(); 

} 

}

回答

0

檢查操作FilterNull的源代碼。

cdistPipe = new Each(cdistPipe, selector,new FilterNull());