Python版本2.7.x和氣流1.5.1版
我DAG的腳本是這樣的
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'Vignesh',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['[email protected]'],
'schedule_interval':timedelta(minutes=5),
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing', default_args=default_args)
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
t.set_upstream(run_this_first)
從,你可以看到我正在創建一個帶有6個任務的DAG第一個任務(Start1)首先啓動,之後所有其他五個任務開始
目前我已經給DAG的首發
它已經完全跑了所有六個任務的第一種類型,但五分鐘後DAG不重新啓動
至今已有然後1之間的時間5分鐘延遲小時仍然DAG不重新啓動我真的不知道我是錯的。
如果有人能指出我有什麼不對,那我真的很高興。我嘗試用airflow testing clear
清除,然後發生同樣的事情。它跑了一次然後就站在那裏。
在命令行中顯示的唯一事情是Getting all instance for DAG testing
當我改變schedule_interval只是它與任何計劃間隔parallel.That是在5分鐘內300個或更多的任務實例已完成運行的位置。有NO 5分鐘的調度時間間隔
代碼2:
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'Vignesh',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
t.set_upstream(run_this_first)
感謝維涅什,
所以你說它會每五秒運行一次,直到執行日期爲當前日期ti我之後它會按照預定的時間間隔 – The6thSense
是的,這就是我的意思。 – Yongyiw
非常感謝,但我有兩個疑惑。我怎麼能安排一個任務從這秒開始,時間間隔爲一個小時。我可以安排一個未來的工作 – The6thSense