2016-09-21 137 views
2

我有一系列Python文件裏面有一系列Python任務:file1.py,..... 放在一個文件夾中。如何使用AirFlow運行Python任務列表?

我讀了Airflow文檔,沒有看到如何指定DAG中python文件的文件夾和文件名?

我想執行那些python文件(不是通過Python運算符的Python函數)。

任務1:執行file1.py(有一些進口包) 任務2:執行file2.py(與其他一些進口包)

這將是有益的。 謝謝,問候

回答

0

您可以通過將包含任務的python文件導入到設計DAG的主文件中來調用您的所有任務。安裝爲排風安裝的一部分教程示例(example_python_operator)有一個很好的例子:

from __future__ import print_function 
from builtins import range 
from airflow.operators import PythonOperator 
from airflow.models import DAG 
from datetime import datetime, timedelta 

import time 
from pprint import pprint 

seven_days_ago = datetime.combine(
     datetime.today() - timedelta(7), datetime.min.time()) 

args = { 
    'owner': 'airflow', 
    'start_date': seven_days_ago, 
} 

dag = DAG(
    dag_id='example_python_operator', default_args=args, 
    schedule_interval=None) 


def my_sleeping_function(random_base): 
    '''This is a function that will run within the DAG execution''' 
    time.sleep(random_base) 


def print_context(ds, **kwargs): 
    pprint(kwargs) 
    print(ds) 
    return 'Whatever you return gets printed in the logs' 

run_this = PythonOperator(
    task_id='print_the_context', 
    provide_context=True, 
    python_callable=print_context, 
    dag=dag) 

for i in range(10): 
    ''' 
    Generating 10 sleeping task, sleeping from 0 to 9 seconds 
    respectively 
    ''' 
    task = PythonOperator(
     task_id='sleep_for_'+str(i), 
     python_callable=my_sleeping_function, 
     op_kwargs={'random_base': float(i)/10}, 
     dag=dag) 

    task.set_upstream(run_this) 
+0

什麼是我想要執行特定的python文件而不是python函數? python_callable ='file1.py' – Tensor

+0

@Tensor我還沒有嘗試過這一點,無法找到並在文檔中引用,但我看到與SSHExecuteOperator它可以引用一個bash腳本文件(如果這有助於反正)。看看https://pythonhosted.org/airflow/_modules/ssh_execute_operator.html#SSHExecuteOperator – Srikanta

6

您可以使用BashOperator執行Python文件作爲任務

from airflow import DAG 
    from airflow.operators import BashOperator,PythonOperator 
    from datetime import datetime, timedelta 

    seven_days_ago = datetime.combine(datetime.today() - timedelta(7), 
             datetime.min.time()) 

    default_args = { 
     'owner': 'airflow', 
     'depends_on_past': False, 
     'start_date': seven_days_ago, 
     'email': ['[email protected]'], 
     'email_on_failure': False, 
     'email_on_retry': False, 
     'retries': 1, 
     'retry_delay': timedelta(minutes=5), 
    ) 

    dag = DAG('simple', default_args=default_args) 
t1 = BashOperator(
    task_id='testairflow', 
    bash_command='python /home/airflow/airflow/dags/scripts/file1.py', 
    dag=dag) 
+4

爲什麼你在這裏導入PythonOperator?你似乎沒有使用它。 – benten

3

要執行Python文件作爲一個整體,使用BashOperator(如liferacer的答案):

from airflow.operators.bash_operator import BashOperator 

bash_task = PythonOperator(
    task_id='bash_task', 
    bash_command='python file1.py', 
    dag=dag 
) 

然後,使用PythonOperator打電話給你做功能。你應該已經有一個__main__塊,所以擺在那裏發生的事情爲main功能,使得您的file1.py看起來像這樣:

def main(): 
    """This gets executed if `python file1` gets called.""" 
    # my code 

if __name__ == '__main__': 
    main() 

那麼你的DAG定義:

from airflow.operators.bash_operator import PythonOperator 

import file1 

python_task = PythonOperator(
    task_id='python_task', 
    python_callable=file1.main, 
    dag=dag 
) 
相關問題