我有一系列Python文件裏面有一系列Python任務:file1.py,..... 放在一個文件夾中。如何使用AirFlow運行Python任務列表?
我讀了Airflow文檔,沒有看到如何指定DAG中python文件的文件夾和文件名?
我想執行那些python文件(不是通過Python運算符的Python函數)。
任務1:執行file1.py(有一些進口包) 任務2:執行file2.py(與其他一些進口包)
這將是有益的。 謝謝,問候
我有一系列Python文件裏面有一系列Python任務:file1.py,..... 放在一個文件夾中。如何使用AirFlow運行Python任務列表?
我讀了Airflow文檔,沒有看到如何指定DAG中python文件的文件夾和文件名?
我想執行那些python文件(不是通過Python運算符的Python函數)。
任務1:執行file1.py(有一些進口包) 任務2:執行file2.py(與其他一些進口包)
這將是有益的。 謝謝,問候
您可以通過將包含任務的python文件導入到設計DAG的主文件中來調用您的所有任務。安裝爲排風安裝的一部分教程示例(example_python_operator)有一個很好的例子:
from __future__ import print_function
from builtins import range
from airflow.operators import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
import time
from pprint import pprint
seven_days_ago = datetime.combine(
datetime.today() - timedelta(7), datetime.min.time())
args = {
'owner': 'airflow',
'start_date': seven_days_ago,
}
dag = DAG(
dag_id='example_python_operator', default_args=args,
schedule_interval=None)
def my_sleeping_function(random_base):
'''This is a function that will run within the DAG execution'''
time.sleep(random_base)
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
dag=dag)
for i in range(10):
'''
Generating 10 sleeping task, sleeping from 0 to 9 seconds
respectively
'''
task = PythonOperator(
task_id='sleep_for_'+str(i),
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(i)/10},
dag=dag)
task.set_upstream(run_this)
您可以使用BashOperator執行Python文件作爲任務
from airflow import DAG
from airflow.operators import BashOperator,PythonOperator
from datetime import datetime, timedelta
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': seven_days_ago,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
)
dag = DAG('simple', default_args=default_args)
t1 = BashOperator(
task_id='testairflow',
bash_command='python /home/airflow/airflow/dags/scripts/file1.py',
dag=dag)
爲什麼你在這裏導入PythonOperator?你似乎沒有使用它。 – benten
要執行Python文件作爲一個整體,使用BashOperator
(如liferacer的答案):
from airflow.operators.bash_operator import BashOperator
bash_task = PythonOperator(
task_id='bash_task',
bash_command='python file1.py',
dag=dag
)
然後,使用PythonOperator
打電話給你做功能。你應該已經有一個__main__
塊,所以擺在那裏發生的事情爲main
功能,使得您的file1.py
看起來像這樣:
def main():
"""This gets executed if `python file1` gets called."""
# my code
if __name__ == '__main__':
main()
那麼你的DAG定義:
from airflow.operators.bash_operator import PythonOperator
import file1
python_task = PythonOperator(
task_id='python_task',
python_callable=file1.main,
dag=dag
)
什麼是我想要執行特定的python文件而不是python函數? python_callable ='file1.py' – Tensor
@Tensor我還沒有嘗試過這一點,無法找到並在文檔中引用,但我看到與SSHExecuteOperator它可以引用一個bash腳本文件(如果這有助於反正)。看看https://pythonhosted.org/airflow/_modules/ssh_execute_operator.html#SSHExecuteOperator – Srikanta