2017-07-25 201 views
4

在我的DAG文件中,我定義了一個on_failure_callback()函數來發布失敗時的Slack。Airflow default on_failure_callback

它工作得很好,如果我指定我的DAG每個運營商:on_failure_callback = on_failure_callback()

有沒有一種方法(例如,或通過我的DAG對象通過default_args)自動分派給我的所有運營商?

+0

有趣的問題,該on_failure_callback在BaseOperator定義,我能想到的唯一的辦法就是創建自己的操作,並從BaseOperator繼承,然後通過你的on_failure_callback () 那裏。想看看別人怎麼看 – Chengzhi

+0

感謝您的意見,但我對改變基本操作系統的基本知識沒有信心。我更喜歡手動添加到每個操作員,但不要錯過BaseOperator的更新(少維護) –

回答

5

我終於找到了一種方法來做到這一點。

你可以通過你的on_failure_callback作爲default_args

class Foo: 
    @staticmethod 
    def get_default_args(): 
     """ 
     Return default args 
     :return: default_args 
     """ 

     default_args = { 
      'on_failure_callback': Foo.on_failure_callback 
     } 

     return default_args 

    @staticmethod 
    def on_failure_callback(context): 
    """ 
    Define the callback to post on Slack if a failure is detected in the Workflow 
    :return: operator.execute 
    """ 

    operator = SlackAPIPostOperator(
     task_id='failure', 
     text=str(context['task_instance']), 
     token=Variable.get("slack_access_token"), 
     channel=Variable.get("slack_channel") 
    ) 

    return operator.execute(context=context)