2017-04-11 77 views
2

當我們做一個dagrun時,在Airflow的UI上,在「圖形視圖」中,我們可以看到每個作業的詳細信息。如何獲得airflow dag運行的JobID?

JobID類似於「scheduled__2017-04-11T10:47:00」

我需要這個JobID進行跟蹤和日誌創建,其中我保持每個任務/ dagrun所花費的時間。

所以我的問題是我該如何獲得正在運行的同一個DAG中的JobID

感謝,阿赫亞

回答

2

這個值實際上是所謂run_id,可以通過上下文或宏訪問。

在python運算符中,這是通過上下文訪問的,在bash運算符中,這是通過bash_command字段上的jinja模板來訪問的。

什麼在宏的可用

更多信息:在神社

https://airflow.incubator.apache.org/code.html#macros

更多信息:

https://airflow.incubator.apache.org/concepts.html#jinja-templating

from airflow.models import DAG 
from datetime import datetime 
from airflow.operators.bash_operator import BashOperator 
from airflow.operators.python_operator import PythonOperator 


dag = DAG(
    dag_id='run_id', 
    schedule_interval=None, 
    start_date=datetime(2017, 2, 26) 
) 

def my_func(**kwargs): 
    context = kwargs 
    print(context['dag_run'].run_id) 

t1 = PythonOperator(
    task_id='python_run_id', 
    python_callable=my_func, 
    provide_context=True, 
    dag=dag 
    ) 

t2 = BashOperator(
    task_id='bash_run_id', 
    bash_command='echo {{run_id}}', 
    dag=dag) 

t1.set_downstream(t2) 

使用此DAG作爲一個例子,並檢查日誌每個操作員,你都應該看到日誌中印有run_id

相關問題