2017-06-13 219 views
6

我很努力去理解Airflow中的BranchPythonOperator是如何工作的。我知道它主要用於分支,但是文檔混淆了什麼要傳入任務以及我需要從上游任務傳遞/期望什麼。Airflow的BranchPythonOperator如何工作?

考慮到文檔on this page中的一個簡單示例,上游任務run_this_first和下游2個分支的源代碼的外觀如何? Airflow知道如何運行branch_a而不是branch_b?上游任務的輸出在哪裏被注意/讀取?

回答

7

您的BranchPythonOperator是使用python_callable創建的,這是一個函數。該函數將根據您的業務邏輯返回您已連接的直接下游任務的任務名稱。這可能是緊接下游的1到N個任務。沒有什麼是下游任務HAVE要讀取,但是您可以使用xcom傳遞它們的元數據。

def decide_which_path(): 
    if something is True: 
     return "branch_a" 
    else: 
     return "branch_b" 


branch_task = BranchPythonOperator(
    task_id='run_this_first', 
    python_callable=decide_which_path, 
    trigger_rule="all_done", 
    dag=dag) 

branch_task.set_downstream(branch_a) 
branch_task.set_downstream(branch_b) 

設置trigger_rule或所有其餘的將被跳過,因爲默認是all_success是很重要的。

+0

對於trigger_rule,這仍然是真的嗎?文檔不建議你需要,但只是一個虛構的任務,因爲其他任務(除了函數返回的那個)將立即下游跳過https://airflow.incubator.apache.org/concepts.html #分支 – Davos

+0

是的,這是正確的,所以它取決於下游任務如何連接。我認爲我認爲所有的分支都合併到主線任務中,但這可能甚至不是正常的用例(但這是我的正常用例)。 – Nick

+0

您的興趣愛好是什麼?我目前使用的是'這個文件是否存在',如果不存在,繼續創建它,否則虛擬任務就會成功退出。它專門用於將一些靜態數據(從不會更改)從SQL數據庫加載到hadoop。我希望它具有冪等性,並且非常快速地禁止,如果不需要,可以完全避免對源數據庫的查詢影響。 – Davos