2017-06-16 48 views
0

如何配置Airflow,以便DAG中的任何故障將(立即)導致鬆弛消息?氣流故障鬆弛消息

此時此刻我通過創建一個slack_failed_task對其進行管理:

slack_failed_task = SlackAPIPostOperator(
    task_id='slack_failed', 
    channel="#datalabs", 
    trigger_rule='one_failed', 
    token="...", 
    text = ':red_circle: DAG Failed', 
    icon_url = 'http://airbnb.io/img/projects/airflow3.png', 
    dag=dag) 

而且從DAG中的每個其他任務上游設置這個任務(one_failed):

slack_failed_task << download_task_a 
slack_failed_task << download_task_b 
slack_failed_task << process_task_c 
slack_failed_task << process_task_d 
slack_failed_task << other_task_e 

它的工作原理,但它的容易出錯,因爲忘記添加任務將跳過鬆散的通知,看起來像很多工作。

有沒有辦法擴展DAG中的email_on_failure屬性?

Bonus ;-)用於包含一種將失敗任務的名稱傳遞給消息的方法。

回答

5

可能會有所幫助:

def slack_failed_task(contextDictionary, **kwargs): 
     failed_alert = SlackAPIPostOperator(
     task_id='slack_failed', 
     channel="#datalabs", 
     token="...", 
     text = ':red_circle: DAG Failed', 
     owner = '_owner',) 
     return failed_alert.execute 


task_with_failed_slack_alerts = PythonOperator(
task_id='task0', 
python_callable=<file to execute>, 
on_failure_callback=slack_failed_task, 
provide_context=True, 
dag=dag) 
2

的BaseOperator支持 'on_failure_callback' 參數:

on_failure_callback(贖回) - 當這個任務的任務實例失敗的函數被調用。上下文字典作爲單個參數傳遞給此函數。上下文包含對相關對象對任務實例的引用,並記錄在API的宏部分下。

我沒有測試過這一點,但你應該能夠定義一個函數職位懈怠失敗這並把它傳遞給每個任務定義。要獲取當前任務的名稱,可以使用{{task_id}}模板。

+0

我不能得到'{{TASK_ID}}'又工作,但你的幫助表示讚賞 –