2017-06-13 40 views
1

我可以在PythonOperator中使用宏嗎?我試着跟着,但我無法得到渲染的宏!Airflow Python運算符中的宏

dag = DAG(
    'temp', 
    default_args=default_args, 
    description='temp dag', 
    schedule_interval=timedelta(days=1)) 

def temp_def(a, b, **kwargs): 
    print '{{ds}}' 
    print '{{execution_date}}' 
    print 'a=%s, b=%s, kwargs=%s' % (str(a), str(b), str(kwargs)) 

ds = '{{ ds }}' 
mm = '{{ execution_date }}' 

t1 = PythonOperator(
    task_id='temp_task', 
    python_callable=temp_def, 
    op_args=[mm , ds], 
    provide_context=False, 
    dag=dag) 

回答

7

僅對模板化字段進行宏處理。爲了讓Jinja處理這個領域,請使用您自己的擴展PythonOperator

class MyPythonOperator(PythonOperator): 
    template_fields = ('templates_dict','op_args') 

我加'templates_dict'template_fields因爲PythonOperator本身具有該領域模板: PythonOperator

現在,你應該能夠在該領域中使用宏:

ds = '{{ ds }}' 
mm = '{{ execution_date }}' 

t1 = MyPythonOperator(
    task_id='temp_task', 
    python_callable=temp_def, 
    op_args=[mm , ds], 
    provide_context=False, 
    dag=dag) 
+1

我們可以標記這是正確的答案嗎?因爲它是正確的答案 –

+1

爲了向後兼容,你可以像這樣'template_fields':'template_fields = PythonOperator.template_fields +('op_args',)''。順便說一句,我打開了一個[JIRA來添加'op_args'和'op_kwargs'到'PythonOperator'模板字段](https://issues.apache.org/jira/browse/AIRFLOW-1814) –

1

在我意見更接近本地的Airflow方式是使用包含的PythonOperator並使用參數provide_context=True

t1 = MyPythonOperator(
    task_id='temp_task', 
    python_callable=temp_def, 
    provide_context=True, 
    dag=dag) 

現在,您可以訪問所有的宏,氣流元數據和任務參數中的kwargs您可調用

def temp_def(**kwargs): 
    print 'ds={}, execution_date={}'.format((str(kwargs['ds']), str(kwargs['execution_date'])) 

如果你有定義params一些自定義的與您可以訪問這些任務相關聯以及通過kwargs['params']

+0

這可能是更好的方法正在做。我的回答主要針對宏爲什麼沒有被處理的具體問題。 – jhnclvr