2
當我們做一個dagrun時,在Airflow的UI上,在「圖形視圖」中,我們可以看到每個作業的詳細信息。如何獲得airflow dag運行的JobID?
JobID類似於「scheduled__2017-04-11T10:47:00」。
我需要這個JobID進行跟蹤和日誌創建,其中我保持每個任務/ dagrun所花費的時間。
所以我的問題是我該如何獲得正在運行的同一個DAG中的JobID。
感謝,阿赫亞
當我們做一個dagrun時,在Airflow的UI上,在「圖形視圖」中,我們可以看到每個作業的詳細信息。如何獲得airflow dag運行的JobID?
JobID類似於「scheduled__2017-04-11T10:47:00」。
我需要這個JobID進行跟蹤和日誌創建,其中我保持每個任務/ dagrun所花費的時間。
所以我的問題是我該如何獲得正在運行的同一個DAG中的JobID。
感謝,阿赫亞
這個值實際上是所謂run_id
,可以通過上下文或宏訪問。
在python運算符中,這是通過上下文訪問的,在bash運算符中,這是通過bash_command
字段上的jinja模板來訪問的。
更多信息:在神社
https://airflow.incubator.apache.org/code.html#macros
更多信息:
https://airflow.incubator.apache.org/concepts.html#jinja-templating
from airflow.models import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='run_id',
schedule_interval=None,
start_date=datetime(2017, 2, 26)
)
def my_func(**kwargs):
context = kwargs
print(context['dag_run'].run_id)
t1 = PythonOperator(
task_id='python_run_id',
python_callable=my_func,
provide_context=True,
dag=dag
)
t2 = BashOperator(
task_id='bash_run_id',
bash_command='echo {{run_id}}',
dag=dag)
t1.set_downstream(t2)
使用此DAG作爲一個例子,並檢查日誌每個操作員,你都應該看到日誌中印有run_id
。