0
我對Google雲端平臺相當陌生,我首次嘗試使用Google Dataflow開展我的研究生課程項目。我想要做的是編寫一個自動加載作業,從我的Cloud Storage上的特定存儲區加載文件,並將其中的數據插入到BigQuery表中。Google Dataflow:PCollection <String>到PCollection <TableRow>適用於BigQuery插入
我得到的數據爲PCollection<String>
型,但對於插入BigQuery中的我顯然需要將其轉換爲一個PCollection<TableRow>
類型。到目前爲止,我還沒有找到一個可靠的答案來做到這一點。
這裏是我的代碼:
public static void main(String[] args) {
//Defining the schema of the BigQuery table
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("Datetime").setType("TIMESTAMP"));
fields.add(new TableFieldSchema().setName("Consumption").setType("FLOAT"));
fields.add(new TableFieldSchema().setName("MeterID").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
//Creating the pipeline
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
//Getting the data from cloud storage
PCollection<String> lines = p.apply(TextIO.Read.named("ReadCSVFromCloudStorage").from("gs://mybucket/myfolder/certainCSVfile.csv"));
//Probably need to do some transform here ...
//Inserting data into BigQuery
lines.apply(BigQueryIO.Write
.named("WriteToBigQuery")
.to("projectID:datasetID:tableID")
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
}
我可能只是忘了一些基本的東西,所以我希望你們能幫助我解決這個...
你應該參考https://cloud.google.com/dataflow/model/par-do - 這顯示瞭如何使用變換一個帕爾多'PCollection' 。 –