2017-02-17 40 views
3

因爲我剛剛開始使用Airflow,因此忍受着我,而我正在嘗試做的是從BashOperator任務收集返回代碼並將其保存到本地變量,然後基於返回代碼分支出另一個任務。我遇到的問題是弄清楚如何讓BashOperator返回一些東西。以下是我的代碼段:Airflow BashOperator收集返回代碼

dag = DAG(dag_id='dag_1', 
     default_args=default_args, 
     schedule_interval='0 2 * * *', 
     user_defined_macros=user_def_macros, 
     dagrun_timeout=timedelta(minutes=60) 
    ) 
oodas = BashOperator(task_id='oodas', xcom_push=True, bash_command="hive -hiveconf SCHEMA={{ schema }} -hiveconf DAY={{ yesterday_ds }} -f {{ script_path }}", dag=dag) 
t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("oodas") }}"', dag=dag) 
t2.set_upstream(oodas) 

我想的xcom_push但說實話不知道它是如何工作..這是收集結果的正確方法?在日誌中,最後一行是:命令退出,返回代碼爲0

回答

0

您可以發佈整個DAG嗎?我以爲你是在解釋氣流是如何工作的

從任務1(如果它是一個bash運營商)的問題,你可以這樣做:

t1 = BashOperator(task_id='t1', bash_command='echo "{{ ti.xcom_push("t1") }}"', dag=dag) 

而且在任務2:

t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("t1") }}"', dag=dag) 

其中Ti爲task_instance變量和{{}}表示法用於訪問變量部分

+0

我剛剛更新了更多代碼。所以我相信你是對的,我沒有正確使用它。這是我的任務實例「ti」只是「oodas」?我不確定BashOperator用於xcom的關鍵是什麼 –