我有一個DAG,它是通過查詢DynamoDB獲取列表創建的,並且列表中的每個項目都使用PythonOperator創建並將其添加到DAG中。在下面的例子中沒有顯示,但重要的是要注意列表中的一些項目取決於其他任務,所以我使用set_upstream
來強制執行依賴關係。動態創建任務列表
- airflow_home
\- dags
\- workflow.py
workflow.py
def get_task_list():
# ... query dynamodb ...
def run_task(task):
# ... do stuff ...
dag = DAG(dag_id='my_dag', ...)
tasks = get_task_list()
for task in tasks:
t = PythonOperator(
task_id=task['id'],
provide_context=False,
dag=dag,
python_callable=run_task,
op_args=[task]
)
問題是workflow.py
是越來越一遍又一遍地跑,我的get_task_list()
方法得到通過AWS和拋出異常節流(每一個任務運行?時間)。
我認爲這是因爲每當run_task()
被稱爲它運行所有的全局數據workflow.py
所以我試着移動run_task()
到一個單獨的模塊,像這樣:
- airflow_home
\- dags
\- workflow.py
\- mypackage
\- __init__
\- task.py
但它並沒有改變任何東西。我甚至嘗試將get_task_list()
放入一個使用工廠函數包裝的SubDagOperator中,該工具的行爲方式仍然相同。
我的問題與這些問題有關嗎?
而且,爲什麼workflow.py
得到運行,因此經常和爲什麼會任務的時候方法不引用workflow.py
通過get_task_list()
拋出事業的各個任務的錯誤和失敗對它沒有依賴性?
最重要的是,什麼是最好的方式來並行處理列表並強制列表中的項目之間的任何依賴關係?
將'min_file_process_interval'設置爲30,將對'get_task_list()'的調用減慢到30秒,並且我停止了被限制。至於動態任務創建,我將嘗試創建一個dag,它將構建另一個dag並將其保存到'globals()[dag_id]',如[FAQ]中所述(http://airflow.readthedocs.io/ EN /最新/ faq.html常見?亮點=動態#如何,可以-I-創建-DAG的-動態) –