批處理Dataflow作業處理完所有數據後是否可以執行操作?具體來說,我想將剛剛處理的流水線的文本文件移動到不同的GCS存儲桶中。我不確定將數據放入管道中的位置,以確保在數據處理完成後執行一次。在Dataflow流水線處理完所有數據後執行操作
0
A
回答
1
我不明白爲什麼你需要做這個後管道執行。您可以使用側面輸出將文件寫入多個存儲桶,並在管道完成後保存自己的副本。
如果這不適合你(無論什麼原因),那麼你可以簡單地在blocking execution模式下運行你的管道,即使用pipeline.run().waitUntilFinish()
,然後在那之後寫下你的代碼的其餘部分。
[..]
/do some stuff before the pipeline runs
Pipeline pipeline = ...
pipeline.run().waitUntilFinish();
//do something after the pipeline finishes here
[..]
-1
我覺得兩個選項可以幫助你在這裏:
1)使用TextIO寫入桶或你想要的,指定的確切GCS路徑(如GS文件夾://沙箱/其他桶)
2)使用Object Change Notifications結合Cloud Functions。你可以在這個here和JS的GCS SDK中找到一個很好的入門書。在這個選項中你將做的事情基本上是在某物桶落下時設置一個觸發器,並使用自己寫的雲功能將其移動到另一個。
相關問題
- 1. Dataflow流水線在執行GroupBy之前等待來自所有流的元素
- 2. label.setVisible(true)在處理完成後纔會執行任何操作
- 3. 在流水線執行中採用並行處理
- 4. Dataflow TPL使用預先條件執行流水線
- 5. 在所有行完成執行而沒有最終執行後處理異常
- 6. 所有異步處理程序完成後執行javascript函數
- 7. 在處理完所有數據後觸發作業
- 8. 完成observable完成後要執行的操作後,如何執行操作?
- 9. 嘗試計算執行五級流水線處理器
- 10. 保留執行流水線
- 11. 在完成操作後執行某些操作
- 12. startActivity完成後執行某些操作
- 13. AsyncTask完成後執行操作
- 14. 篩選數據後執行操作
- 15. 數據渲染後執行操作
- 16. hadoop-streaming:一旦工作完成後自動執行後處理?
- 17. 流水線中的多處理完成正確
- 18. 使用DataFlow和RX的連續數據流停止處理
- 19. R使用匿名函數進行流水線操作
- 20. Artifactory作爲Jenkins流水線中的後期構建操作
- 21. HTTP流水線和錯誤處理
- 22. 文本處理流水線文件
- 23. Redis流水線,處理緩存丟失
- 24. VHDL中的圖像處理流水線
- 25. arm組裝中的流水線處理
- 26. 我如何確保在所有線程完成執行後執行語句
- 27. 後臺工作線程中的函數調用只有在主線程執行完成後才能執行?
- 28. C#在BackgroundWorker後執行一些操作RunWorker完成完成
- 29. 批量執行流操作
- 30. 當數據庫完成處理/加載數據時執行宏
謝謝。使用'BlockingDataflowPipelineRunner'運行這個工作就可以實現。 'waitUntilFinish()'似乎在1.x Java API中不可用。 – user01380121
正確,不是。您在1.x中使用Blocking runner和wait/poll –