2017-06-14 36 views
0

下一個順序執行的順序運行,我已經在那裏我指定的需要被順序運行三個任務DAG的文件(T1 - > T2 - > T3):確保任務(Apache的氣流)

default_args = { 
    'owner': 'airflow', 
    'start_date': datetime(2017, 6, 14, 23 , 20), 
    'email_on_failure': False, 
    'email_on_retry': False, 
    } 

dag = DAG('test_dag', default_args=default_args, schedule_interval="*/5 * * * *") 

t1 = BashOperator(
    task_id='form_dataset', 
    bash_command='python 1.py', 
    dag=dag) 

t2 = BashOperator(
    task_id='form_features', 
    bash_command='python 2.py', 
    depends_on_past=True, 
    dag=dag) 

t3 = BashOperator(
    task_id='train', 
    bash_command='python 3.py', 
    depends_on_past=True, 
    dag=dag) 

t2.set_upstream(t1) 
t3.set_upstream(t2) 
t4.set_upstream(t3) 

我假設順序行爲t1-> t2-> t3是一個默認的行爲,我認爲情況並非如此(順序非常隨機,例如t1 - > t2 - > t2- - > T1 - > T3)。我錯過了什麼樣的論點會糾正這種行爲?

回答

1

您需要在文件的末尾添加語句

t1 >> t2 >> t3 

。有關詳細信息,請參見以下鏈接: https://airflow.incubator.apache.org/concepts.html#bitshift-composition

爲了完整起見,還可以通過對任務使用set_upstream()或set_downstream()方法來完成此操作。

+0

感謝您的回覆,我確實收到了回覆(抱歉,我沒有將它寫入代碼中)。我想問題是,所有三個任務的總運行時間大於計劃間隔。我想優先執行DAG中的所有任務,而不是運行新的DAG,但到目前爲止沒有找到相關的屬性。 –

+0

每個DAG運行的執行都取決於上次運行的成功完成嗎?在這種情況下,您可以查看參數depends_on_true的行爲。還有另一個選項可以將每個任務分配給一個池,並將該池的大小限制爲1.或者我誤解了您的用例?一般來說,dag任務應儘可能獨立於其他運行。 – Him

+0

在airflow.cfg中有一個設置max_active_runs_per_dag。將其設置爲1還應該防止在前一個完成之前重新開始相同的dag。 – Him