我想要讀取文件,並根據文件字段中存在的日期值將其寫入BigQuery分區表。例如如果文件包含7月25日和26日的兩個日期,則DataFlow應根據文件中存在的數據將該數據寫入2個分區。使用DataFlow作業分區表加載
public class StarterPipeline {
private static final Logger LOG =
LoggerFactory.getLogger(StarterPipeline.class);
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject("");
options.setTempLocation("gs://stage_location/");
Pipeline p = Pipeline.create(options);
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("id").setType("STRING"));
fields.add(new TableFieldSchema().setName("name").setType("STRING"));
fields.add(new TableFieldSchema().setName("designation").setType("STRING"));
fields.add(new TableFieldSchema().setName("joindate").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
PCollection<String> read = p.apply("Read Lines",TextIO.read().from("gs://hadoop_source_files/employee.txt"));
PCollection<TableRow> rows = read.apply(ParDo.of(new DoFn<String,TableRow>(){
@ProcessElement
public void processElement(ProcessContext c) {
String[] data = c.element().split(",");
c.output(new TableRow().set("id", data[0]).set("name", data[1]).set("designation", data[2]).set("joindate", data[3]));
}
}));
rows.apply(BigQueryIO.writeTableRows().to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
public String getDate(String value) {
return "project:dataset.DataFlow_Test$"+value;
}
@Override
public TableDestination apply(ValueInSingleWindow<TableRow> value) {
TableRow row = value.getValue();
String date = getDate(row.get("joindate").toString());
String tableSpec = date;
String tableDescription = "";
return new TableDestination(tableSpec, tableDescription);
}
}).withFormatFunction(new SerializableFunction<TableRow, TableRow>() {
@Override
public TableRow apply(TableRow input) {
// TODO Auto-generated method stub
return input;
}
}).withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
p.run();
}
}
在運行上面的程序我得到以下錯誤:異常線程 「main」 org.apache.beam.sdk.Pipeline $ PipelineExecutionException:java.lang.IllegalArgumentException異常:表引用不是在[PROJECT_ID ]:[dataset_id]。[table_id] format:引起:java.lang.IllegalArgumentException:表引用不在[project_id]:[dataset_id]。[table_id]格式中。讓我知道是否有任何建議
歡迎堆棧溢出!請僅對實際評論使用評論。您應該[編輯]原始問題以添加其他信息。 – DaveP