2017-07-15 66 views
0

我有一個DAG,它是通過查詢DynamoDB獲取列表創建的,並且列表中的每個項目都使用PythonOperator創建並將其添加到DAG中。在下面的例子中沒有顯示,但重要的是要注意列表中的一些項目取決於其他任務,所以我使用set_upstream來強制執行依賴關係。動態創建任務列表

- airflow_home 
    \- dags 
    \- workflow.py 

workflow.py

def get_task_list(): 
    # ... query dynamodb ... 

def run_task(task): 
    # ... do stuff ... 

dag = DAG(dag_id='my_dag', ...) 
tasks = get_task_list() 
for task in tasks: 
    t = PythonOperator(
     task_id=task['id'], 
     provide_context=False, 
     dag=dag, 
     python_callable=run_task, 
     op_args=[task] 
    ) 

問題是workflow.py是越來越一遍又一遍地跑,我的get_task_list()方法得到通過AWS和拋出異常節流(每一個任務運行?時間)。

我認爲這是因爲每當run_task()被稱爲它運行所有的全局數據workflow.py所以我試着移動run_task()到一個單獨的模塊,像這樣:

- airflow_home 
    \- dags 
    \- workflow.py 
    \- mypackage 
     \- __init__ 
     \- task.py 

但它並沒有改變任何東西。我甚至嘗試將get_task_list()放入一個使用工廠函數包裝的SubDagOperator中,該工具的行爲方式仍然相同。

我的問題與這些問題有關嗎?

而且,爲什麼workflow.py得到運行,因此經常和爲什麼會任務的時候方法不引用workflow.py通過get_task_list()拋出事業的各個任務的錯誤和失敗對它沒有依賴性?

最重要的是,什麼是最好的方式來並行處理列表並強制列表中的項目之間的任何依賴關係?

回答

1

根據您引用的問題,在dag運行時,氣流不支持創建任務。

因此,氣流將在開始運行之前定期生成完整的DAG定義。理想情況下,此類生成的時間應該與該DAG的時間間隔相同。

但是這可能是每次氣流檢查dag的變化時,它也會生成完整的dag,從而導致請求過多。該時間使用airflow.cfg中的配置min_file_process_interval和dag_dir_list_interval進行控制。

關於任務失敗,他們失敗,因爲創建DAG本身失敗,氣流無法啓動它們。

+0

將'min_file_process_interval'設置爲30,將對'get_task_list()'的調用減慢到30秒,並且我停止了被限制。至於動態任務創建,我將嘗試創建一個dag,它將構建另一個dag並將其保存到'globals()[dag_id]',如[FAQ]中所述(http://airflow.readthedocs.io/ EN /最新/ faq.html常見?亮點=動態#如何,可以-I-創建-DAG的-動態) –