2017-08-08 91 views
2

我發現下面的鏈接:我如何觸發氣流-dag使用TriggerDagRunOperator

https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand

這的確說明了如何使用TriggerDagRunOperator來執行不同的氣流DAG。文檔使用了Airflow自己的示例dags,但我很難理解那些因爲他們沒有使用任何傳感器。

有人可以解釋如何使用TriggerDagRunOperatorSqlSensor開始單獨的dag嗎?我試圖在我的SQL Server作業任務完成時啓動單獨的DAG。我知道如何通過使用SqlSensor來檢查SQL Server作業的狀態,但我不知道如何將結果附加到TriggerDagRunOperator以啓動單獨的DAG。

我不想使用Airflow CLI或在一個DAG中執行這兩項任務。基本上,我想這只是觸發dag而已。

下面是我當前的代碼,這是缺少關鍵conditionally_trigger

# File Name: check-when-db1-sql-task-is-done 

from airflow import DAG 
from airflow.operators import TriggerDagRunOperator 
from airflow.operators import SqlSensor 
from datetime import datetime 


default_args = { 
     'owner': 'airflow', 
     'retry_delay': timedelta(minutes=5), 
} 

dag = DAG('check-when-db1-sql-task-is-done', 
     description='Check-when-DB1-SQL-task-is-done', 
     default_args=default_args, 
     schedule_interval='@once', 
     start_date=datetime.now(), 
     ) 

# returns-0-or-1-based-on-job-task-status 
sqlsensor = SqlSensor (
     task_id='sql-sensor', 
     poke_interval=30, 
     timeout=3200, 
     sql="""select last_run_outcome from msdb.dbo.sysjobsteps where job_id = '249A5A5D-6AFC-4D6B-8CB1-27C16724A450' and step_id = '1' and last_run_date = (select convert(varchar(24),getdate(),112)); """,  
     mssql_conn_id='db1', 
     dag=dag, 
     ) 

# dag-to-start 
trigger = TriggerDagRunOperator (
     task_id='start-ssh-job', 
     trigger_dag_id="qa-knime-ssh-task", 
     python_callable=conditionally_trigger, 
     params={'condition_param': True, 
       'message': 'Hello World'}, 
     dag=dag) 

回答

0

我的理解是,TriggerDagRunOperator是當你想使用Python函數,以確定是否要觸發SubDag。在你的代碼和例子中這個函數被稱爲conditionally_trigger

在你的情況下,你使用傳感器來控制流量,不需要傳遞函數。你可以使用的,而不是SubDagOperatorTriggerDagRunOperator通過一個簡單的永遠真函數作爲python_callable

... 
python_callable=lambda(context, dag_run_obj):dag_run_obj, 
...