2017-06-05 41 views
0

我有一個DAG,並行扇出到多個獨立單元。這在AWS中運行,因此我們擁有的任務可以將我們的AutoScalingGroup擴展到DAG啓動時的最大工作人員數量,並在DAG完成時達到最小數量。簡化的版本是這樣的:氣流 - 運行任務,無論上游成功/失敗

  | - - taskA - - | 
      |    | 
scaleOut - | - - taskB - - | - scaleIn 
      |    | 
      | - - taskC - - | 

然而,一些在平行設置的任務失敗偶爾,我不能得到按比例縮小的任務時,任何的A-C任務無法運行。

一旦所有其他任務完成(成功或失敗),在DAG結束時執行任務的最佳方式是什麼? depends_on_upstream設置聽起來像我們需要的,但實際上沒有基於測試做任何事情。

回答

3

所有操作員都有一個參數trigger_rule,它可以設置爲'all_done',這將觸發該任務,而不管前一個任務的失敗或成功。

您可以將要運行的任務的觸發器規則設置爲'all_done'而不是缺省'all_success'

這樣的說法一個簡單的bash運營商的任務看起來像:

task = BashOperator(
    task_id="hello_world", 
    bash_command="echo Hello World!", 
    trigger_rule="all_done", 
    dag=dag 
    ) 
相關問題