0
我正在使用包含多個子數據標的主dag(main_dag),並且每個子數據都有一些任務。我從subdagA taskA中推送了一個xcom,但我在subdagB taskB中拉取了該xcom。由於xcom_pull()中的dag_id參數默認爲self.dag_id,因此我一直無法提取必要的xcom。我想知道如何做到這一點和/或是否有更好的方法來設定這種情況,所以我不必處理這個問題。什麼我目前在做subdagB從子dag中拉xcom
例如:
def subdagB(parent_dag, child_dag, start_date, schedule_interval):
subdagB = DAG('%s.%s' % (parent_dag, child_dag), start_date=start_date, schedule_interval=schedule_interval)
start = DummyOperator(
task_id='taskA',
dag=subdagB)
tag_db_template = '''echo {{ task_instance.xcom_pull(dag_id='dag.main_dag.subdagA', task_ids='taskA') }};'''
t1 = BashOperator(
task_id='taskB',
bash_command=tag_db_template,
xcom_push=True,
dag=subdagB)
end = DummyOperator(
task_id='taskC',
dag=subdagB)
t0.set_upstream(start)
t1.set_upstream(t0)
end.set_upstream(t1)
return subdagB
預先感謝您的幫助!
請注意,如果您使用TriggerDagRunOperator,則默認情況下,DagRun將具有不同的執行日期,並且您將無法跨* DAG取消xcoms *。您可以自定義dagrun_operator.py以保留調用DAG的execution_date以解決此問題。 – jastang