4
在我的DAG文件中,我定義了一個on_failure_callback()函數來發布失敗時的Slack。Airflow default on_failure_callback
它工作得很好,如果我指定我的DAG每個運營商:on_failure_callback = on_failure_callback()
有沒有一種方法(例如,或通過我的DAG對象通過default_args)自動分派給我的所有運營商?
在我的DAG文件中,我定義了一個on_failure_callback()函數來發布失敗時的Slack。Airflow default on_failure_callback
它工作得很好,如果我指定我的DAG每個運營商:on_failure_callback = on_failure_callback()
有沒有一種方法(例如,或通過我的DAG對象通過default_args)自動分派給我的所有運營商?
我終於找到了一種方法來做到這一點。
你可以通過你的on_failure_callback作爲default_args
class Foo:
@staticmethod
def get_default_args():
"""
Return default args
:return: default_args
"""
default_args = {
'on_failure_callback': Foo.on_failure_callback
}
return default_args
@staticmethod
def on_failure_callback(context):
"""
Define the callback to post on Slack if a failure is detected in the Workflow
:return: operator.execute
"""
operator = SlackAPIPostOperator(
task_id='failure',
text=str(context['task_instance']),
token=Variable.get("slack_access_token"),
channel=Variable.get("slack_channel")
)
return operator.execute(context=context)
有趣的問題,該on_failure_callback在BaseOperator定義,我能想到的唯一的辦法就是創建自己的操作,並從BaseOperator繼承,然後通過你的on_failure_callback () 那裏。想看看別人怎麼看 – Chengzhi
感謝您的意見,但我對改變基本操作系統的基本知識沒有信心。我更喜歡手動添加到每個操作員,但不要錯過BaseOperator的更新(少維護) –