2017-04-19 52 views
0

我正在使用包含多個子數據標的主dag(main_dag),並且每個子數據都有一些任務。我從subdagA taskA中推送了一個xcom,但我在subdagB taskB中拉取了該xcom。由於xcom_pull()中的dag_id參數默認爲self.dag_id,因此我一直無法提取必要的xcom。我想知道如何做到這一點和/或是否有更好的方法來設定這種情況,所以我不必處理這個問題。什麼我目前在做subdagB從子dag中拉xcom

例如:

def subdagB(parent_dag, child_dag, start_date, schedule_interval): 
    subdagB = DAG('%s.%s' % (parent_dag, child_dag), start_date=start_date, schedule_interval=schedule_interval) 
    start = DummyOperator(
     task_id='taskA', 
     dag=subdagB) 
    tag_db_template = '''echo {{ task_instance.xcom_pull(dag_id='dag.main_dag.subdagA', task_ids='taskA') }};''' 
    t1 = BashOperator(
     task_id='taskB', 
     bash_command=tag_db_template, 
     xcom_push=True, 
     dag=subdagB) 
    end = DummyOperator(
     task_id='taskC', 
     dag=subdagB) 
    t0.set_upstream(start) 
    t1.set_upstream(t0) 
    end.set_upstream(t1) 
    return subdagB 

預先感謝您的幫助!

回答

0

只要你重寫[Operator] .xcom_pull(dag_id = dag_id,...)或[TaskInstance] .xcom_pull(dag_id = dag_id,...)中的dag_id,你應該沒問題。

where dag_id = "{parent_dag_id}.{child_dag_id}"

如果你可以讓你的例子更完整,我可以嘗試本地運行它,但我測試了(類似)的例子,跨subdag-xcoms正常工作。

+0

請注意,如果您使用TriggerDagRunOperator,則默認情況下,DagRun將具有不同的執行日期,並且您將無法跨* DAG取消xcoms *。您可以自定義dagrun_operator.py以保留調用DAG的execution_date以解決此問題。 – jastang