2016-09-30 52 views
2

我有一些文本文件,其中包含要從批處理模式下運行的DataflowPipelineRunner到日期分區的BigQuery表中的數據。我不想在運行時插入到當前分區,而是想根據每行中提到的日期插入分區。 (不幸的是,我不能使用bq命令行工具來直接導入文本文件,因爲我需要轉換一些值。)如何在批量Dataflow作業中使用基於數據本身的日期寫入日期分區的BigQuery表?

我試圖通過從ParDo函數輸出時間戳插入窗口天,然後應用該窗口並輸出後綴$的表名和相應的日期。

BigQueryIO.Write.to(new SerializableFunction<BoundedWindow, String>() { 
    public String apply(BoundedWindow window) { 
    String dayString = DateTimeFormat.forPattern("yyyyMMdd") 
         .withZone(DateTimeZone.forID("Europe/Stockholm")) 
         .print(((IntervalWindow)window).start()); 
    return dataset + "$" + dayString; 
    } 
}) 
.withSchema(schema.getSchema()) 
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); 

當我嘗試運行此操作時,我受到影響by a Dataflow bug。我也found out

批處理模式還不支持每個窗口表。

那麼我怎樣才能寫一個日期分區表作爲分區的指定日期?

+0

這是現在批處理和流模式都可用:http://stackoverflow.com/questions/43505534/writing-different-values-to-different-bigquery-tables-in-apache-beam/43505535 – jkff

回答

1

如果您需要輸出的表格數量相對較少且數量固定,則可以爲每個表格創建一個單獨的BigQueryIO.Write轉換,然後根據日期爲您的數據創建一個單獨的BigQueryIO.Write轉換。如果輸出表的數量非常大,那麼在批量Dataflow支持每個窗口表之前,目前還沒有好的解決方案。

相關問題