2017-07-25 54 views
0

我想要讀取文件,並根據文件字段中存在的日期值將其寫入BigQuery分區表。例如如果文件包含7月25日和26日的兩個日期,則DataFlow應根據文件中存在的數據將該數據寫入2個分區。使用DataFlow作業分區表加載

public class StarterPipeline { 
    private static final Logger LOG = 
     LoggerFactory.getLogger(StarterPipeline.class); 

    public static void main(String[] args) { 
    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); 
    options.setProject(""); 
    options.setTempLocation("gs://stage_location/"); 
    Pipeline p = Pipeline.create(options); 

    List<TableFieldSchema> fields = new ArrayList<>(); 
    fields.add(new TableFieldSchema().setName("id").setType("STRING")); 
    fields.add(new TableFieldSchema().setName("name").setType("STRING")); 
    fields.add(new TableFieldSchema().setName("designation").setType("STRING")); 
    fields.add(new TableFieldSchema().setName("joindate").setType("STRING")); 
    TableSchema schema = new TableSchema().setFields(fields); 

    PCollection<String> read = p.apply("Read Lines",TextIO.read().from("gs://hadoop_source_files/employee.txt")); 

    PCollection<TableRow> rows = read.apply(ParDo.of(new DoFn<String,TableRow>(){ 
     @ProcessElement 
     public void processElement(ProcessContext c) { 
     String[] data = c.element().split(","); 

     c.output(new TableRow().set("id", data[0]).set("name", data[1]).set("designation", data[2]).set("joindate", data[3])); 
     } 
    })); 


    rows.apply(BigQueryIO.writeTableRows().to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() { 
     public String getDate(String value) { 
     return "project:dataset.DataFlow_Test$"+value; 
     } 

     @Override 
     public TableDestination apply(ValueInSingleWindow<TableRow> value) { 
     TableRow row = value.getValue(); 
     String date = getDate(row.get("joindate").toString()); 
     String tableSpec = date; 
     String tableDescription = ""; 
     return new TableDestination(tableSpec, tableDescription); 
     } 
    }).withFormatFunction(new SerializableFunction<TableRow, TableRow>() { 
     @Override 
     public TableRow apply(TableRow input) { 
     // TODO Auto-generated method stub 
     return input; 
     } 
    }).withSchema(schema) 
     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) 
     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); 

    p.run(); 
    } 
} 

在運行上面的程序我得到以下錯誤:異常線程 「main」 org.apache.beam.sdk.Pipeline $ PipelineExecutionException:java.lang.IllegalArgumentException異常:表引用不是在[PROJECT_ID ]:[dataset_id]。[table_id] format:引起:java.lang.IllegalArgumentException:表引用不在[project_id]:[dataset_id]。[table_id]格式中。讓我知道是否有任何建議

+2

歡迎堆棧溢出!請僅對實際評論使用評論。您應該[編輯]原始問題以添加其他信息。 – DaveP

回答

1

梁當前不支持日期分區表。有關跟蹤此功能的問題,請參閱BEAM-2390

0

我可以根據日期目前在使用下面的代碼數據將數據加載到分區表:

 rows.apply(BigQueryIO.writeTableRows().to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() { 
     @Override 
     public TableDestination apply(ValueInSingleWindow<TableRow> value) { 
      TableRow row = value.getValue(); 
      TableReference reference = new TableReference(); 
      reference.setProjectId(""); 
      reference.setDatasetId(""); 

      reference.setTableId("tabelname$" + row.get("datefield").toString()); 
      return new TableDestination(reference, null); 
     } 
     }).withFormatFunction(new SerializableFunction<TableRow, TableRow>() { 
     @Override 
     public TableRow apply(TableRow input) { 
      LOG.info("format function:"+input.toString()); 
      return input; 
     } 
     }).withSchema(schema) 
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) 
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); 
相關問題