2017-09-12 219 views
1

我在使用LocalScheduler選項的EC2實例上使用氣流。我已經調用airflow schedulerairflow webserver,一切似乎都運行良好。也就是說,在將cron字符串提供給schedule_interval用於「每10分鐘執行一次」'*/10 * * * *'後,該作業默認每24小時繼續執行一次。下面的代碼頭:每X分鐘運行一次氣流DAG

from datetime import datetime 
import os 
import sys 

from airflow.models import DAG 
from airflow.operators.python_operator import PythonOperator 

import ds_dependencies 

SCRIPT_PATH = os.getenv('PREPROC_PATH') 

if SCRIPT_PATH: 
    sys.path.insert(0, SCRIPT_PATH) 
    import workers 
else: 
    print('Define PREPROC_PATH value in environmental variables') 
    sys.exit(1) 

default_args = { 
    'start_date': datetime(2017, 9, 9, 10, 0, 0, 0), #..EC2 time. Equal to 11pm hora México 
    'max_active_runs': 1, 
    'concurrency': 4, 
    'schedule_interval': '*/10 * * * *' #..every 10 minutes 
} 

DAG = DAG(
    dag_id='dash_update', 
    default_args=default_args 
) 

... 

回答

4

default_args只是爲了填補傳遞給運營商一個DAG內PARAMS。 max_active_runs,concurrencyschedule_interval都是用於初始化DAG的參數,而不是操作員。這是你想要什麼:

DAG = DAG(
    dag_id='dash_update', 
    start_date=datetime(2017, 9, 9, 10, 0, 0, 0), #..EC2 time. Equal to 11pm hora México 
    max_active_runs=1, 
    concurrency=4, 
    schedule_interval='*/10 * * * *', #..every 10 minutes 
    default_args=default_args, 
) 

我已經把它們混合起來之前一樣,所以以供參考(注意有重疊):

DAG參數:https://airflow.incubator.apache.org/code.html?highlight=dag#airflow.models.DAG 操作參數:https://airflow.incubator.apache.org/code.html#baseoperator

+0

品牌很有道理,完全錯過了。謝謝@Daniel – Aaron