3
有沒有什麼辦法可以在不進行多任務的情況下依次運行回填?例如,如果我使用多個日期運行回填,例如 氣流回填[dag] -s「2017-07-01」-e「2017-07-10」,有什麼方法可以在跑到下一個之前完成每個DAG天?現在,它正在完成每項任務的所有日子,然後再進行下一項任務。Airflow - BACKFILLING
謝謝。
有沒有什麼辦法可以在不進行多任務的情況下依次運行回填?例如,如果我使用多個日期運行回填,例如 氣流回填[dag] -s「2017-07-01」-e「2017-07-10」,有什麼方法可以在跑到下一個之前完成每個DAG天?現在,它正在完成每項任務的所有日子,然後再進行下一項任務。Airflow - BACKFILLING
謝謝。
您可以將DAG的max_active_runs
參數設置爲1,以確保只有一個運行該DAG的DAG可以同時進行調度。 https://pythonhosted.org/airflow/code.html?highlight=concurrency#models
如果您需要整個DAG是前進的,你可以添加一個ExternalTaskSensor
您DAG的開始和結束時的DummyOperator
收集任務之前完成。然後將ExternalTaskSensor設置爲在上一次運行結束時在DummyOperator上觸發。
dag = DAG(dag_id='dag')
wait_for_previous_operator = ExternalTaskSensor(\
task_id='wait_for_previous',
external_dag_id='dag',
external_task_id='collection',
execution_delta=schedule_interval,
dag=dag)
collection_operator = DummyOperator(\
task_id='collection',
dag=dag)
wait_for_previous_operator.set_downstream(your_other_tasks_list)
collection_operator.set_upstream(your_other_tasks_list)
嗨。 max_active_runs已經設置爲1. 事實上,當時只有一個dag運行。但它會在繼續執行隊列中的下一個任務之前嘗試完成同一類型的所有任務(在所有日期中)。 –
我更新了我的回答以反映您的意見 –
兩者都在同一個DAG中?讓我嘗試一下。我認爲ExternalTaskSensors應該用於不同的dag。讓我們來看看! –