2017-10-18 106 views
0

假設我通過SideOutputs創建了兩個輸出PCollections,並且取決於某些條件,我只想將其中一個寫入BigQuery。這個怎麼做?將一個特定的PCollection寫入BigQuery

基本上我的用例是我試圖使Write_Append和Write_Truncate動態。我從我在BigQuery中維護的配置表中獲取信息(append/truncate)。所以根據我在配置表中的內容,我必須應用截斷或追加。

因此,使用SideOutputs我可以創建兩個PCollections(分別爲Append和Truncate),其中一個將爲空。具有所有行的那個必須寫入BigQuery。這種方法是否正確?

,我正在使用的代碼:

final TupleTag<TableRow> truncate = 
        new TupleTag<TableRow>(){}; 
       // Output that contains word lengths. 
       final TupleTag<TableRow> append = 
        new TupleTag<TableRow>(){}; 

       PCollectionTuple results = read.apply("convert to table row",ParDo.of(new DoFn<String,TableRow>(){ 
       @ProcessElement 
       public void processElement(ProcessContext c) 
       { 
        String value = c.sideInput(configView).get(0).toString(); 
        LOG.info("config: "+value); 
        if(value.equals("truncate")){ 
         LOG.info("outputting to truncate"); 
         c.output(new TableRow().set("color", c.element())); 
        } 
        else 
        { 
         LOG.info("outputting to append"); 
         c.output(append,new TableRow().set("color", c.element())); 
        } 
        //c.output(new TableRow().set("color", c.element())); 
       } 
      }).withSideInputs(configView).withOutputTags(truncate, 
        TupleTagList.of(append))); 

       results.get(truncate).apply("truncate",BigQueryIO.writeTableRows() 
         .to("projectid:datasetid.tableid") 
         .withSchema(schema) 
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) 
         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); 

       results.get(append).apply("append",BigQueryIO.writeTableRows() 
         .to("projectid:datasetid.tableid") 
         .withSchema(schema) 
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) 
         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); 

我需要執行一出兩個。如果我做這兩個表將會被截斷反正。

P.S.我使用的Java SDK(Apache的梁2.1)

+0

這是一個普遍的問題,「這種方法是否正確?」或者你想要一些代碼解決方案? –

+0

@Marcin Zablocki好吧,我也想要一些代碼解決方案 – rish0097

+0

你說你有兩個PCollections,那麼問題是什麼呢?分裂和寫作的方法似乎沒問題。 –

回答

0

我相信你是正確的,如果你的管道包括在所有的BigQuery資料表WRITE_TRUNCATE寫,目前該表將變得更截斷如果沒有數據。在這種情況下,請撥打file a JIRA以支持更多可配置的行爲。

所以,如果你想它有條件地不被截斷,你需要有條件地不包括寫轉換。是否有辦法將條件推到該級別,還是實際上必須從流水線中的其他數據計算條件?

(我能想到的唯一解決方法是使用DynamicDestinations動態選擇要截斷的表的名稱,並截斷一些其他的虛擬空表 - 相反,我可以在回答上一段後詳細說明這一點)

+0

嗨@jkff ...一種將條件推到該級別的方法是我期待的一種解決方案,可以在這裏找到......並且是必須從我從配置表中檢索的數據計算得出的,該配置表表明APPEND或TRUNCATE ...所以如果有一個使用DynamicDestinations的解決方法,我真的很想知道... – rish0097

+0

你可以在你的主程序中說if(condition){p.apply(... APPEND ...) }其他{p.apply(... TRUNCATE ...)}?或者,條件本身是否依賴於管道計算的數據,並且無法通過構建管道的主程序進行評估? – jkff

+0

它依賴於通過流水線計算的數據 – rish0097

相關問題