2017-08-18 41 views
12

我目前正在編寫一個Java工具來從GCS導入幾個CSV文件到BigQuery。我可以通過bq load輕鬆實現此目的,但我想使用Dataflow作業來完成此操作。因此,我使用Dataflow的Pipeline和ParDo變換器(返回TableRow將其應用於BigQueryIO),並且爲轉換創建了StringToRowConverter()。這裏實際的問題開始 - 我被迫指定目標表的模式,儘管我不想創建一個新表,如果它不存在 - 只是試圖加載數據。所以我不想手動設置TableRow的列名,因爲我有大約600列。如何將CSV文件導入到沒有任何列名或模式的BigQuery表中?

public class StringToRowConverter extends DoFn<String, TableRow> { 

private static Logger logger = LoggerFactory.getLogger(StringToRowConverter.class); 

public void processElement(ProcessContext c) { 
    TableRow row = new TableRow();   
    row.set("DO NOT KNOW THE COLUMN NAME", c.element()); 
    c.output(row); 
} 
} 

此外,假設表已經存在於BigQuery資料集,我並不需要創建它,也CSV文件包含在一個正確的順序列。

如果對此方案沒有解決方法並且數據加載需要列名,那麼我可以在CSV文件的第一行包含它。

任何幫助將不勝感激。

回答

7

爲避免創建表格,應在管道配置期間使用BigQueryIO.Write的BigQueryIO.Write.CreateDisposition.CREATE_NEVER。來源:https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/BigQueryIO.Write

您不需要事先知道BigQuery表架構,就可以動態發現它。例如,您可以使用BigQuery API(https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get)查詢表模式並將其作爲類StringToRowConverter的參數傳遞。另一種選擇是,假設第一行是標題,則跳過第一行並使用它來正確映射文件的其餘部分。

下面的代碼實現了第二種方法,並將輸出配置爲追加到現有的BigQuery表。

public class DFJob { 

    public static class StringToRowConverter extends DoFn<String, TableRow> { 

     private String[] columnNames; 

     private boolean isFirstRow = true; 

     public void processElement(ProcessContext c) { 
      TableRow row = new TableRow(); 

      String[] parts = c.element().split(","); 

      if (isFirstRow) { 
       columnNames = Arrays.copyOf(parts, parts.length); 
       isFirstRow = false; 
      } else { 
       for (int i = 0; i < parts.length; i++) { 
        row.set(columnNames[i], parts[i]); 
       } 
       c.output(row); 
      } 
     } 
    } 

    public static void main(String[] args) { 
     DataflowPipelineOptions options = PipelineOptionsFactory.create() 
       .as(DataflowPipelineOptions.class); 
     options.setRunner(BlockingDataflowPipelineRunner.class); 

     Pipeline p = Pipeline.create(options); 

     p.apply(TextIO.Read.from("gs://dataflow-samples/myfile.csv")) 
       .apply(ParDo.of(new StringToRowConverter())) 
       .apply(BigQueryIO.Write.to("myTable") 
         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) 
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); 

     PipelineResult result = p.run(); 
    } 
} 
相關問題