我們最近試圖採用Airflow作爲我們的「數據工作流程」引擎,儘管我已經將大部分內容弄清楚了,但我仍然處於調度程序如何計算何時觸發DAG的灰色地帶。氣流DAG觸發
在這個簡單的DAG請看:
from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
dag_options = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': datetime.now()
}
with DAG('test_dag1', schedule_interval="5 * * * *", default_args=dag_options) as dag:
task1 = BashOperator(
task_id='task1',
bash_command='date',
dag=dag)
時間表會挑這個,但不會執行它。現在,如果我將「start_date」更改爲:
其中xxxx,yyyy,zzzz是今天的日期,它將開始執行。原因是調度程序不斷從源dag文件夾重新讀取此dag文件,每次執行datetime.now(),注意到開始日期與當前排隊不同,重新添加此dag並因此重新調度/推動執行日期前進(我的dag_dir_list_interval是300)?
- 第一DAG執行:
而且,在氣流,我理解,當一個DAG是取消暫停(或者與dags_are_paused_at_creation加入=假),則調度器將如下安排執行即時後(起始+間隔)
- 第二DAG執行:時刻之後(開始+(間隔* 2))
- 第三DAG執行:時刻之後(開始+(間隔* 3))
這是正確的假設嗎?
更新(2017年7月30日)
基於這樣的假設之上,我創造了這個DAG今日(2017年7月30日):
from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
dag_options = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date':
datetime(year=2017,month=7,day=30,hour=20,minute=10)
}
with DAG('test_dag_100', schedule_interval="*/10 * * * *",
default_args=dag_options) as dag:
task1 = BashOperator(
task_id='task_100',
bash_command='date',
dag=dag)
應該開始(UTC ):
- 2017年7月30日20時20分00秒
- 2017年7月30日二十點30分00秒
- 2017/7/30 20:40:00
不幸的是,這沒有發生。 這裏是我的儀表板的一些屏幕截圖:
有人能解釋爲什麼二十點21分00秒達格沒有執行? 20:31:00之後它仍然沒有執行......我在這裏錯過了什麼?順便說一句,我還注意到,出於某種原因,我每次去 手動通過儀表板啓動一個DAG,它只是在「運行」階段。爲什麼是這樣?手動啓動它與任何啓動時間選項(start_date/interval/etc)有什麼關係?
謝謝您提供的任何說明
計劃間隔是crontab中,你可以嘗試https://crontab.guru/測試對你意味着什麼區間。如果使用的是1.8 +,datetime.now()不被認爲是最好的做法,你可以在這裏找到https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#less-forgiving-詳細調度程序上動態起始日期 – Chengzhi