我正在使用Apache Beam Java SDK創建Google數據流管道。我在那裏進行了一些轉換,最後我創建了一個實體集合(PCollection < Entity>)。我需要將它寫入Google DataStore,然後在所有實體寫入後執行另一次轉換。 (例如通過PubSub消息將保存的對象的ID廣播給多個訂閱者)。DataStoreIO.Write鏈接另一個轉換
現在,存儲PCollection的方法是:。 entities.DatastoreIO.v1()寫()withProjectId( 「ABC」)
這會返回一個PDone對象,我不知道怎麼樣在Write()完成後,我可以鏈接另一個轉換。由於DatastoreIO.write()調用不返回PCollection,因此我無法繼續使用管道。我有兩個問題:
如何獲取寫入數據存儲的對象的ID?
如何附加另一個將在所有實體保存後執行的轉換?
感謝您的回答。我會嘗試一下。只是爲了確認我的理解:我們可以有p1.run()。waitUntilFinish(),然後有p2.run()..在這種情況下,管道p2將在p1完成後啓動。那是對的嗎 ? – Venky
我試過了,它在我使用BlockingDataflowPipelineRunner時工作但是當我使用使用模板時(參考:https://cloud.google.com/dataflow/docs/templates/overview),我不知道如何使它工作。我猜一個模板文件只與一個管道相關聯?我如何創建一個將創建管道,執行它,等待它完成,然後在完成後再啓動另一個管道的模板? – Venky
這是不可能的 - 1個模板是1個管道。 – jkff