我正在寫一個數據流的管道應做3兩件事:谷歌雲數據流:提交作業執行,但使用舊代碼
- 從GCP存儲器中讀取的.csv文件
- 解析數據至BigQuery campatible TableRows
- 將數據寫入BigQuery資料表
到目前爲止這一切工作就像一個魅力。它仍然如此,但是當我更改源變量和目標變量時,沒有任何變化。實際運行的工作是舊的,而不是最近更改(並提交)的代碼。不知何故,當我使用BlockingDataflowPipelineRunner從Eclipse運行代碼時,代碼本身並未上傳,但使用了較舊的版本。
通常沒有錯的代碼,但要儘可能完整:
public class BatchPipeline {
String source = "gs://sourcebucket/*.csv";
String destination = "projectID:datasetID.testing1";
//Creation of the pipeline with default arguments
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<String> line = p.apply(TextIO.Read.named("ReadFromCloudStorage")
.from(source));
@SuppressWarnings("serial")
PCollection<TableRow> tablerows = line.apply(ParDo.named("ParsingCSVLines").of(new DoFn<String, TableRow>(){
@Override
public void processElement(ProcessContext c){
//processing code goes here
}
}));
//Defining the BigQuery table scheme
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("datetime").setType("TIMESTAMP").setMode("REQUIRED"));
fields.add(new TableFieldSchema().setName("consumption").setType("FLOAT").setMode("REQUIRED"));
fields.add(new TableFieldSchema().setName("meterID").setType("STRING").setMode("REQUIRED"));
TableSchema schema = new TableSchema().setFields(fields);
String table = destination;
tablerows.apply(BigQueryIO.Write
.named("BigQueryWrite")
.to(table)
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withoutValidation());
//Runs the pipeline
p.run();
}
這種問題的出現,因爲我剛換的筆記本電腦,不得不重新配置應有盡有。我正在開發一個乾淨的Ubuntu 16.04 LTS OS,並且安裝了所有的GCP開發依賴項(通常)。通常情況下,一切都配置得相當好,因爲我能夠開始一項工作(如果我的配置出錯,這應該是不可能的,對吧?)。我使用Eclipse Neon btw。
那麼問題出在哪裏呢?在我看來,上傳代碼存在問題,但我已確保我的雲端git回購是最新的,並且分段存儲桶已被清理...
**** UPDATE ****
我從來沒有發現到底發生了什麼錯誤,但是當我檢出部署的jar中的文件的創建日期時,我確實看到它們從未真正更新過。然而,jar文件本身有一個最近的時間戳,這讓我完全忽略了這個問題(菜鳥錯誤)。
我最終通過在Eclipse中創建一個新的Dataflow項目並將我的.java文件從已損壞的項目複製到新項目中,最終獲得了所有工作。從那時起,一切都像個魅力一樣。
您是否在運行之前驗證了分段存儲桶是空的,並且在運行時填充了新的(有時間戳的)jar? –
是的,我已經嘗試了一個新的空桶。作業完成後,它填充了新的jar文件,但仍然沒有執行我的新代碼......不知道Dataflow沒有收到我實際上新編寫的代碼,這是我無法理解的。 – Matteus