2017-03-23 81 views
2

我有以下兩個SSHExecuteOperator任務的DAG。第一個任務執行返回參數的存儲過程。第二項任務需要此參數作爲輸入。如何檢索通過SSHExecuteOperator推送的Airflow XCom的值

請解釋如何從推送到task1中的XCom中獲取值,以便在task2中使用它?

from airflow import DAG 
from datetime import datetime, timedelta 
from airflow.contrib.hooks.ssh_hook import SSHHook 
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator 
from airflow.models import Variable 

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime.now(), 
    'email': ['[email protected]'], 
    'email_on_failure': True, 
    'retries': 0 
} 

#server must be changed to point to the correct environment, to do so update DataQualitySSHHook variable in Airflow admin 
DataQualitySSHHook = Variable.get('DataQualitySSHHook') 
print('Connecting to: ' + DataQualitySSHHook) 
sshHookEtl = SSHHook(conn_id=DataQualitySSHHook) 
sshHookEtl.no_host_key_check = True 

#create dag 
dag = DAG(
    'ed_data_quality_test-v0.0.3', #update version whenever you change something 
    default_args=default_args, 
    schedule_interval="0 0 * * *", 
    dagrun_timeout=timedelta(hours=24), 
    max_active_runs=1) 

#create tasks 
task1 = SSHExecuteOperator(
    task_id='run_remote_sp_audit_batch_register', 
    bash_command="bash /opt/scripts/data_quality/EXEC_SP_AUDIT_BATCH.sh 'ED_DATA_QUALITY_MANUAL' 'REGISTER' '1900-01-01 00:00:00.000000' '2999-12-31 00:00:00.000000' ", #keep the space at the end 
    ssh_hook=sshHookEtl, 
    xcom_push=True, 
    retries=0, 
    dag=dag) 

task2 = SSHExecuteOperator(
    task_id='run_remote_sp_audit_module_session_start', 
    bash_command="echo {{ ti.xcom_pull(task_ids='run_remote_sp_audit_batch_register') }}", 
    ssh_hook=sshHookEtl, 
    retries=0, 
    dag=dag) 

#create dependencies 
task1.set_downstream(task2) 
+0

你DAG定義似乎好。你能否成功運行DAG?任何錯誤? –

回答

1

所以,我已經找到了解決辦法是,當TASK1執行shell腳本,你必須確保你希望被XCOM變量被捕獲的參數(使用echo)通過腳本打印的最後一件事。

然後我可以用下面的代碼片段檢索XCOM變量值:

{{ task_instance.xcom_pull(task_ids='run_remote_sp_audit_batch_register') }}