2017-06-23 27 views
0

我想使管道的一部分依賴於管道創建過程的動態輸入。我的問題是,推薦的方式是什麼?在Java中的Google Dataflow 1.9.0中多次使用DoFn

如果我有以下的(降低的)代碼:

public static void createPipeline(){ 
    Pipeline p = TestPipeline.create(); 
    p.apply(new Source()).apply(new DoFunction()); 
    p.apply(new AnotherSource()).apply(new DoFunction()); 
    p.run; 
} 

現在DoFunction應的參數。 我應該實例化一次並將它傳遞給函數createPipeline,還是應該使用類參數並實例化它? 版本與實例化的功能:

public static void createPipeline(DoFn dofn){ 
    Pipeline p = TestPipeline.create(); 
    p.apply(new Source()).apply(dofn); 
    p.apply(new AnotherSource()).apply(dofn); 
    p.run; 
} 

版本與類參數:

public static void createPipeline(Class<?> fnClass){ 
    Pipeline p = TestPipeline.create(); 
    p.apply(new Source()).apply(fnClass.newInstance()); 
    p.apply(new AnotherSource()).apply(fnClass.newInstance()); 
    p.run; 
} 

回答

0

有沒有需要傳遞一個Class - 你可以通過DoFn

public static void createPipeline(DoFn<Foo, Baz> dofn) { 
    Pipeline pipeline = TestPipeline.create(); 

    pipeline 
     .apply(Read.from(new Source())) 
     .apply(ParDo.of(dofn)); 

    pipeline 
     .apply(Read.from(new AnotherSource())) 
     .apply(ParDo.of(dofn)); 

    pipeline.run(); 
} 

你甚至可以通過完全實例化ParDo.of(doFn)變換和多次使用它。

如果您實際上不打算使用從ParDo返回的PCollection,那麼您也可以將輸入合併在一起。

相關問題