2016-09-22 75 views
1

在我的項目中,我試圖向管道中處理的數據添加一些元數據。元數據位於src-folder旁邊名爲resources的子文件夾中的DBF文件中。在Google Dataflow中作爲DataflowPipelineRunner運行時訪問資源文件

src文件夾包含主要類,我有幾個包(IO,處理,聚合,utils)。

我在定義管道的主要類中讀取和處理帶有元數據的文件。我使用訪問該文件的代碼如下:

File temp1 = new File("resources/xxx.dbf"); 

我檢查,如果該文件是使用中發現:

LOG.info(temp1.exists()) 

其運行正常。

有消息作爲Strings發佈,我使用PubSubIO閱讀。我使用這個文件的內容來填充包含鍵和值的地圖。

Map<String, ArrayList<Double>> sensorToCoordinates = coordinateData.getSensorLocations(); 

我然後設置在自定義類的靜態變量被稱爲「SensorValues」我提出:

SensorValue.setKeyToCoordinates(sensorToCoordinates); 

當從字符串到SensorValue-I類使用帕爾函數由解析傳入消息(從PCollection到PCollection),該映射用於SensorValue類的構造函數中。

使用DirectPipelineRunner運行此代碼非常完美。但是,當我使用DataflowPipelineRunner並嘗試訪問SensorValue構造函數中的映射時,我遇到了NullPointerException。

現在我想知道爲什麼setter在使用DataflowPipelineRunner時不工作(我猜它與在幾個worker中分配執行有關),以及使用任何靜態資源文件的最佳做法是什麼爲了豐富你的管道?

回答

1

你說得對,問題是因爲ParDo的執行被分發給多個工作者。他們沒有本地文件,他們可能沒有地圖的內容。

有幾個選擇這裏:

  1. 放入GCS文件,並有管道讀取文件(使用TEXTIO或類似的東西)中的內容,並把它作爲一個side-input您稍後處理。

  2. 將文件包含在管道資源中,並加載需要它的startBundle(將來會有辦法使這種情況發生的次數少於每個包)。

  3. 您可以將地圖內容序列化爲DoFn的參數,方法是將其作爲傳遞給該類構造函數的非靜態字段放入。

選項1是這個文件的大小增加更好(因爲它可以支持分裂它成若干小塊,做查詢),而方案2有可能減少網絡流量來檢索文件。選項3只在文件非常小的情況下才起作用,因爲它將顯着增加序列號DoFn的大小,這可能導致作業被提交給Dataflow服務。

+0

我試過第一個解決方案,它的工作原理。 我用'PCollectionView >'將鍵映射到正確的座標(與View.asMap()),就像一個魅力,感謝本! –