我想使用Airflow執行一個簡單的任務python。氣流 - Python文件不在同一個DAG文件夾中
from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
from pprint import pprint
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
args = {
'owner': 'airflow',
'start_date': seven_days_ago,
}
dag = DAG(dag_id='python_test', default_args=args)
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=print_context,
dag=dag)
,如果我嘗試,例如:
airflow test python_test print 2015-01-01
它的工作原理!
現在我想把我的def print_context(ds, **kwargs)
函數放在其他python文件中。所以,我創建了一個名爲antoher文件:simple_test.py和變化:
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=simple_test.print_context,
dag=dag)
現在我再次嘗試運行:
airflow test python_test print 2015-01-01
和OK!它仍然工作!
但是,如果我創建一個模塊,例如,工作模塊與文件SimplePython.py
,進口(from worker import SimplePython
),並嘗試:
airflow test python_test print 2015-01-01
它給人的消息:
ImportError: No module named worker
的問題:
- 是否可以在DAG定義中導入模塊?
- Airflow + Celery如何將所有必需的Python源文件分佈在工作節點上?