2016-09-06 61 views
0

我創建了一個DAG,其中包含爲每個工作流調度@daily間隔和單獨的任務ID。但它沒有按照例外運行。有沒有可能這樣做?有沒有其他方法可以爲特定的DAG創建動態任務?並使用命令行暫停特定任務實例?動態創建的任務/ dag在apache airflow中不工作

from __future__ import print_function 
from builtins import range 
from airflow.operators import PythonOperator,DummyOperator,BranchPythonOperator,SqlSensor 
from airflow.models import DAG 
from datetime import datetime, timedelta 

import time 
from pprint import pprint 

seven_days_ago = datetime.combine(
     datetime.today() - timedelta(7), datetime.min.time()) 

args = { 
    'owner': 'varakumar', 
    'start_date': seven_days_ago, 
} 

dag = DAG(
    dag_id='dynamic_task_creation', default_args=args, 
    schedule_interval="@daily") 

def get_decision(): 
    return "right" 

start = DummyOperator(
    task_id='start', 
    dag=dag) 

td=datetime.today() 
x=str(datetime(td.year,td.month,td.day,td.hour,td.minute,td.second)).replace (" ", "_").replace (":", "-") 
pause_task_id = ("pause-%s" % x) 

pause = DummyOperator(
    task_id=pause_task_id, 
    dag=dag) 
pause.set_upstream(start) 

decision = BranchPythonOperator(
    task_id='decision', 
    python_callable=lambda: get_decision(), 
    dag=dag) 
decision.set_upstream(pause) 

left = DummyOperator(
    task_id='left', 
    dag=dag) 
left.set_upstream(decision) 

right = DummyOperator(
    task_id='right', 
    dag=dag) 
right.set_upstream(decision) 

預先感謝您

+0

您是如何預期它運行的?它是如何運行的? – Mercury

回答

0

我看到的第一個問題是,您使用的是動態start_date。我在做這件事時看到了一些奇怪的行爲,我認爲這是基於氣流保持過去的重大清單的方式。嘗試指定一個固定的start_date,看看是否能解決任何問題。

在任何一種情況下,氣流文檔advise against dynamic start dates(向下滾動一下並閱讀start_date的說明)。

編輯:另請查看this瞭解更多信息。

相關問題