2017-06-05 62 views
1

我想在管道完成所有處理後運行一些代碼,因此我使用BlockingDataflowPipelineRunner並在的main之後放置代碼。使用BlockingDataflowPipelineRunner和數據流模板的後處理代碼

這工作正常,當我運行使用BlockingDataflowPipelineRunner命令行工作。流水線完成處理後,運行pipeline.run()下的代碼。

然而,當我嘗試運行作業,模板它不工作。我部署的工作爲模板(與TemplatingDataflowPipelineRunner),然後試圖在雲功能像這樣運行模板:

dataflow.projects.templates.create({ 
    projectId: 'PROJECT ID HERE', 
    resource: { 
     parameters: { 
      runner: 'BlockingDataflowPipelineRunner' 
     }, 
     jobName: `JOB NAME HERE`, 
     gcsPath: 'GCS TEMPLATE PATH HERE' 
    } 
}, function(err, response) { 
    if (err) { 
     // etc 
    } 
    callback(); 
}); 

亞軍似乎並沒有採取。我可以把胡言亂語放在跑壘員的位置,這份工作還在繼續。

pipeline.run()下當每個作業運行不運行的代碼 - 它只有當我部署模板運行。

預計main中的pipeline.run()下的代碼在每次作業運行時都不運行?管道結束後是否有執行代碼的解決方案?

(爲背景,pipeline.run()後的代碼移動從一個雲存儲桶到另一個文件。它的歸檔剛由作業處理的文件。)

回答

1

是的,這個預期的行爲。模板表示管道本身,並允許通過啓動模板(重新)執行管道。由於該模板不包含來自main()方法的任何代碼,因此它不允許在管道執行後執行任何操作。

同樣,dataflow.projects.templates.create API只是推出模板API。

阻塞亞軍完成這個問題的方法是從已創建管道落榜ID,當它已經完成定期輪詢來觀察。對於您的用例,您需要執行相同的操作:

  1. 執行dataflow.projects.templates.create(...)以創建Dataflow作業。這應該返回作業ID。
  2. 定期(每5-10s,例如)查詢dataflow.projects.jobs.get(...)檢索與給定ID的工作,並檢查它處於什麼狀態。