2017-09-15 90 views
0

我正在使用Apache Beam Java SDK創建Google數據流管道。我在那裏進行了一些轉換,最後我創建了一個實體集合(PCollection < Entity>)。我需要將它寫入Google DataStore,然後在所有實體寫入後執行另一次轉換。 (例如通過PubSub消息將保存的對象的ID廣播給多個訂閱者)。DataStoreIO.Write鏈接另一個轉換

現在,存儲PCollection的方法是:。 entities.DatastoreIO.v1()寫()withProjectId( 「ABC」)

這會返回一個PDone對象,我不知道怎麼樣在Write()完成後,我可以鏈接另一個轉換。由於DatastoreIO.write()調用不返回PCollection,因此我無法繼續使用管道。我有兩個問題:

  1. 如何獲取寫入數據存儲的對象的ID?

  2. 如何附加另一個將在所有實體保存後執行的轉換?

回答

1

我們沒有一個很好的方法做任何的這些東西(返回寫出的數據存儲實體的標識,或等待,直到實體已被寫入),雖然這是遠從第一類似要求(人例如,BigQuery問這個問題),我們正在考慮這個問題。

現在,您唯一的選擇是等待整個管道完成,例如,通過pipeline.run().waitUntilFinish(),然後在你的主程序中做你想要的(例如你可以運行另一個管道)。

+0

感謝您的回答。我會嘗試一下。只是爲了確認我的理解:我們可以有p1.run()。waitUntilFinish(),然後有p2.run()..在這種情況下,管道p2將在p1完成後啓動。那是對的嗎 ? – Venky

+0

我試過了,它在我使用BlockingDataflowPipelineRunner時工作但是當我使用使用模板時(參考:https://cloud.google.com/dataflow/docs/templates/overview),我不知道如何使它工作。我猜一個模板文件只與一個管道相關聯?我如何創建一個將創建管道,執行它,等待它完成,然後在完成後再啓動另一個管道的模板? – Venky

+0

這是不可能的 - 1個模板是1個管道。 – jkff