3
因爲我剛剛開始使用Airflow,因此忍受着我,而我正在嘗試做的是從BashOperator任務收集返回代碼並將其保存到本地變量,然後基於返回代碼分支出另一個任務。我遇到的問題是弄清楚如何讓BashOperator返回一些東西。以下是我的代碼段:Airflow BashOperator收集返回代碼
dag = DAG(dag_id='dag_1',
default_args=default_args,
schedule_interval='0 2 * * *',
user_defined_macros=user_def_macros,
dagrun_timeout=timedelta(minutes=60)
)
oodas = BashOperator(task_id='oodas', xcom_push=True, bash_command="hive -hiveconf SCHEMA={{ schema }} -hiveconf DAY={{ yesterday_ds }} -f {{ script_path }}", dag=dag)
t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("oodas") }}"', dag=dag)
t2.set_upstream(oodas)
我想的xcom_push但說實話不知道它是如何工作..這是收集結果的正確方法?在日誌中,最後一行是:命令退出,返回代碼爲0。
我剛剛更新了更多代碼。所以我相信你是對的,我沒有正確使用它。這是我的任務實例「ti」只是「oodas」?我不確定BashOperator用於xcom的關鍵是什麼 –