0

我正在寫一個數據流的管道應做3兩件事:谷歌雲數據流:提交作業執行,但使用舊代碼

  • 從GCP存儲器中讀取的.csv文件
  • 解析數據至BigQuery campatible TableRows
  • 將數據寫入BigQuery資料表

到目前爲止這一切工作就像一個魅力。它仍然如此,但是當我更改源變量和目標變量時,沒有任何變化。實際運行的工作是舊的,而不是最近更改(並提交)的代碼。不知何故,當我使用BlockingDataflowPipelineRunner從Eclipse運行代碼時,代碼本身並未上傳,但使用了較舊的版本。

通常沒有錯的代碼,但要儘可能完整:

public class BatchPipeline { 
    String source = "gs://sourcebucket/*.csv"; 
    String destination = "projectID:datasetID.testing1";  

    //Creation of the pipeline with default arguments 
    Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create()); 

    PCollection<String> line = p.apply(TextIO.Read.named("ReadFromCloudStorage") 
      .from(source)); 

    @SuppressWarnings("serial") 
    PCollection<TableRow> tablerows = line.apply(ParDo.named("ParsingCSVLines").of(new DoFn<String, TableRow>(){ 
     @Override 
     public void processElement(ProcessContext c){ 
      //processing code goes here 
     } 
    })); 

    //Defining the BigQuery table scheme 
    List<TableFieldSchema> fields = new ArrayList<>(); 
    fields.add(new TableFieldSchema().setName("datetime").setType("TIMESTAMP").setMode("REQUIRED")); 
    fields.add(new TableFieldSchema().setName("consumption").setType("FLOAT").setMode("REQUIRED")); 
    fields.add(new TableFieldSchema().setName("meterID").setType("STRING").setMode("REQUIRED")); 
    TableSchema schema = new TableSchema().setFields(fields); 
    String table = destination; 

    tablerows.apply(BigQueryIO.Write 
      .named("BigQueryWrite") 
      .to(table) 
      .withSchema(schema) 
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) 
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
      .withoutValidation()); 

    //Runs the pipeline 
    p.run(); 
} 

這種問題的出現,因爲我剛換的筆記本電腦,不得不重新配置應有盡有。我正在開發一個乾淨的Ubuntu 16.04 LTS OS,並且安裝了所有的GCP開發依賴項(通常)。通常情況下,一切都配置得相當好,因爲我能夠開始一項工作(如果我的配置出錯,這應該是不可能的,對吧?)。我使用Eclipse Neon btw。

那麼問題出在哪裏呢?在我看來,上傳代碼存在問題,但我已確保我的雲端git回購是最新的,並且分段存儲桶已被清理...

**** UPDATE ****

我從來沒有發現到底發生了什麼錯誤,但是當我檢出部署的jar中的文件的創建日期時,我確實看到它們從未真正更新過。然而,jar文件本身有一個最近的時間戳,這讓我完全忽略了這個問題(菜鳥錯誤)。

我最終通過在Eclipse中創建一個新的Dataflow項目並將我的.java文件從已損壞的項目複製到新項目中,最終獲得了所有工作。從那時起,一切都像個魅力一樣。

+0

您是否在運行之前驗證了分段存儲桶是空的,並且在運行時填充了新的(有時間戳的)jar? –

+0

是的,我已經嘗試了一個新的空桶。作業完成後,它填充了新的jar文件,但仍然沒有執行我的新代碼......不知道Dataflow沒有收到我實際上新編寫的代碼,這是我無法理解的。 – Matteus

回答

1

提交Dataflow作業後,您可以通過檢查作業描述的一部分中的文件(可通過DataflowPipelineWorkerPoolOptions#getFilesToStage獲取)來檢查哪些工件是作業規範的一部分。下面的代碼片段給出了一些如何獲取這些信息的示例。

​​

上面的代碼應該打印出來是這樣的:

/my/path/to/file/dataflow.jar 
/another/path/to/file/myapplication.jar 
/a/path/to/file/alibrary.jar 

很可能是你的上傳已經過時了含舊代碼的一些方法作業的資源的一部分。查看暫存列表中的所有目錄和jar部分,並查找BatchPipeline的所有實例並驗證其年齡。可以使用jar工具或任何zip文件讀取器提取jar文件。或者,使用javap或任何其他class file inspector來驗證BatchPipeline類文件是否與您所做的預期更改一致。

+0

我對數據流相當陌生,所以我在那裏就像這樣的指南?我真的不知道如何開始調試這個問題...如果你有一些建議,拍攝因爲我可以使用一些體面的方法/技巧來在將來自己調試這些類型的問題。 – Matteus

+0

您是否試圖說我的答案確實解決了您的問題,並且您需要關於如何調試未來問題的一般指導,或者您是否想說我的答案沒有足夠的細節讓您能夠嘗試? –

+0

我確實可以使用一些關於如何調試我的代碼的指導,因爲我還是比較新的。你的解決方案可能會解決這個問題,但說實話,我不知道如何開始實施它......所以,如果你沒有太多麻煩,我會非常感謝你的答案更詳細的版本。提前致謝! – Matteus