2015-10-14 83 views
8

代碼:氣流不調度正確的Python

Python版本2.7.x和氣流1.5.1版

我DAG的腳本是這樣的

from airflow import DAG 
from airflow.operators import BashOperator 
from datetime import datetime, timedelta 


default_args = { 
'owner': 'Vignesh', 
'depends_on_past': False, 
'start_date': datetime(2015,10,13), 
'email': ['[email protected]'], 
'schedule_interval':timedelta(minutes=5), 
'email_on_failure': True, 
'email_on_retry': True, 
'retries': 1, 
'retry_delay': timedelta(minutes=5), 
} 
dag = DAG('testing', default_args=default_args) 
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag) 
for i in range(5): 
    t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag) 
    t.set_upstream(run_this_first) 

從,你可以看到我正在創建一個帶有6個任務的DAG第一個任務(Start1)首先啓動,之後所有其他五個任務開始

目前我已經給DAG的首發

它已經完全跑了所有六個任務的第一種類型,但五分鐘後DAG不重新啓動

至今已有然後1之間的時間5分鐘延遲小時仍然DAG不重新啓動我真的不知道我是錯的。

如果有人能指出我有什麼不對,那我真的很高興。我嘗試用airflow testing clear清除,然後發生同樣的事情。它跑了一次然後就站在那裏。

在命令行中顯示的唯一事情是Getting all instance for DAG testing

當我改變schedule_interval只是它與任何計劃間隔parallel.That是在5分鐘內300個或更多的任務實例已完成運行的位置。有NO 5分鐘的調度時間間隔

代碼2:

from airflow import DAG 
from airflow.operators import BashOperator 
from datetime import datetime, timedelta 


default_args = { 
'owner': 'Vignesh', 
'depends_on_past': False, 
'start_date': datetime(2015,10,13), 
'email': ['[email protected]'], 
'email_on_failure': True, 
'email_on_retry': True, 
'retries': 1, 
'retry_delay': timedelta(minutes=5), 
} 
dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here 
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag) 
for i in range(5): 
    t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag) 
    t.set_upstream(run_this_first) 

感謝維涅什,

回答

4

代碼2,我猜測爲什麼它運行每分鐘的原因是:

  1. 開始時間爲2015-10-13 00:00

  2. 程序間隔爲5分鐘

  3. 調度的每一次心跳(缺省爲5秒),您的DAG將被選中

    • 首先檢查:開始日期(沒有最後執行日期發現)+調度 當前時間間隔<?如果是,則將執行DAG並記錄最後的執行時間 。 (例如:2015-10-13 00:00 + 5min < current?)
    • 第二次檢查下次心跳:上次執行時間+調度程序 區間<當前時間?如果是的話,DAG將被再次執行。
    • ....

將該溶液設置DAG作爲起始日期datetime.now() - schedule_interval

和如果要調試:

  1. settings.py中的LOGGINGLEVEL設置爲debug

  2. 修改類方法的airflow.models.TaskInstanceis_queueable()

def is_queueable(self, flag_upstream_failed=False): 
    logging.debug('Checking whether task instance is queueable or not!') 
    if self.execution_date > datetime.now() - self.task.schedule_interval: 
     logging.debug('Too early to execute: execution_date {0} + task.schedule_interval {1} > datetime.now() {2}'.format(self.execution_date, self.task.schedule_interval, datetime.now())) 
     return False 
     ... 
+0

所以你說它會每五秒運行一次,直到執行日期爲當前日期ti我之後它會按照預定的時間間隔 – The6thSense

+0

是的,這就是我的意思。 – Yongyiw

+0

非常感謝,但我有兩個疑惑。我怎麼能安排一個任務從這秒開始,時間間隔爲一個小時。我可以安排一個未來的工作 – The6thSense

3

由於開始時間(2015-10-13 00:00)小於現在時間,它會觸發氣流backfill。它將從2015-10-13 00:00開始,當氣流調度程序檢測到每秒鐘(它的開始日期),但執行日期在5分鐘(任務間隔時間)之間。

查看日誌名稱:

$tree airflow/logs/testing/ 
testing/ 
|-- Orders10 
| |-- 2015-10-13T00:00:00 
| |-- 2015-10-13T00:05:00 
| -- 2015-10-13T00:10:00 
|-- Orders11 
| |-- 2015-10-13T00:00:00 
| |-- 2015-10-13T00:05:00 
| -- 2015-10-13T00:10:00 
|-- Orders12 
| |-- 2015-10-13T00:00:00 
| |-- 2015-10-13T00:05:00 
| -- 2015-10-13T00:10:00 
|-- Orders13 
| |-- 2015-10-13T00:00:00 
| |-- 2015-10-13T00:05:00 
| -- 2015-10-13T00:10:00 
|-- Orders14 
| |-- 2015-10-13T00:00:00 
| |-- 2015-10-13T00:05:00 
| -- 2015-10-13T00:10:00 
-- Start1 
    |-- 2015-10-13T00:00:00 
    |-- 2015-10-13T00:05:00 
    |-- 2015-10-13T00:10:00 
    -- 2015-10-13T00:15:00 

見日誌的創建時間:

$ll airflow/logs/testing/Start1 
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:50 2015-10-13T00:00:00 
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:50 2015-10-13T00:05:00 
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:51 2015-10-13T00:10:00 
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:52 2015-10-13T00:15:00 

此外,你可以看到在網絡用戶界面任務實例:

air flow Task Instances

+0

是的,你是對的 – The6thSense