考慮以下DAG示例,其中第一個任務get_id_creds
從數據庫中提取憑證列表。此操作告訴我數據庫中的哪些用戶能夠運行進一步的數據預處理,並將這些ID寫入文件/tmp/ids.txt
。然後,我將這些ID掃描到我的DAG中,並使用它們生成可並行運行的upload_transaction
任務列表。如何動態迭代上游任務的輸出以在氣流中創建並行任務?
我的問題是:有沒有更習慣性地使用氣流做到這一點的正確動態方法?我在這裏感到笨拙和脆弱。我如何直接將一個有效的ID列表從一個進程傳遞到定義後續下游進程?
from datetime import datetime, timedelta
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)
default_args = {
'start_date': datetime.now(),
'schedule_interval': None
}
DAG = DAG(
dag_id='dash_preproc',
default_args=default_args
)
get_id_creds = PythonOperator(
task_id='get_id_creds',
python_callable=dash_workers.get_id_creds,
provide_context=True,
dag=DAG)
with open('/tmp/ids.txt', 'r') as infile:
ids = infile.read().splitlines()
for uid in uids:
upload_transactions = PythonOperator(
task_id=uid,
python_callable=dash_workers.upload_transactions,
op_args=[uid],
dag=DAG)
upload_transactions.set_downstream(get_id_creds)
檢查https://stackoverflow.com/questions/41517798/proper-way-to-create-dynamic-workflows-in-airflow –
@JuanRiaza就是這樣,非常感謝。我最終使用的解決方案簡化了代碼,足以將我的解決方案作爲一個單獨的答案發布 - 我在當天結束時沒有任何對'Xcom'的需求 – Aaron