2016-08-04 35 views
0

以前,PCollection格式化過的結果;我用下面的代碼在大查詢插入行:從Dataflow插入BigQuery中的數據

    // OPTION 1 
PCollection<TableRow> formattedResults = .... 
formattedResults.apply(BigQueryIO.Write.named("Write").to(tableName) 
          .withSchema(tableSchema) 
          .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
          .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); 

而且所有的行直接插在BigQuery中,都好到這裏。但現在我已經開始動態識別表名和行,以便我如下創建PCollection:(字符串將表名稱,然後它作爲值行)

PCollection<KV<String, TableRow>> tableRowMap // OPTION 2 

而且,我創建的行組這將在同一個表中去,因爲:

PCollection<KV<String, Iterable<TableRow>>> groupedRows //OPTION 3 

其中key(字符串)是BQ表名和值在BQ要插入的行的列表。

使用選項1,我可以使用上面顯示的代碼輕鬆地在BQ中插入行,但相同的代碼不能與OPTION 2或OPTION 3一起使用,因爲在這種情況下,我的表名是映射中的鍵。有沒有辦法使用OPTION 2或OPTION 3在表格中插入行。任何鏈接或代碼示例都會有很大的幫助。

回答

1

Dataflow正在向每個窗口的表寫入最近的東西(並且您可以創建自己的BoundedWindow子類和WindowFn以在窗口中包含所需的任何數據)。爲此,請使用

to(SerializableFunction<BoundedWindow,String> tableSpecFunction) 

on BigQueryIO.Write。

請注意,此功能使用BigQuery的流式上載功能,每個表限制爲100MB/s。另外,上傳不是原子的,因此失敗的批處理作業可能只上傳部分輸出。

-1

您還可以選擇創建自己的DoFn,它直接將數據插入bigquery,而不是依賴BigQueryIO.Write。 從技術上講,您需要創建BigQueryTableInserter,您可以使用insertAll(TableReference ref, List<TableRow> rowList)將東西插入到所需的表格中。

您可以使用像創建TableReference: new TableReference().setProjectId("projectfoo").setDatasetId("datasetfoo").setTableId("tablefoo")

這是不是100%推薦BigQueryIO做一些不錯的東西分裂是需要插入到最大化吞吐量行和正確處理重試。

相關問題