我想觸發simplehttpoperator,像這樣: 氣流trigger_dag test_trigger --conf '{ 「名」: 「東西」}'如何將參數從pythonoperator任務傳遞到airflow中的simplehttpoperator任務dag?
然後我用pythonoperator python_callable使用kwargs [ 'dag_run' 接受的參數] .conf,我想將['dag_run']。conf傳遞給simplehttpoperator,我該怎麼做?任何人都可以幫忙
cc_ = {}
def run_this_func(ds, **kwargs):
cc_ = kwargs['dag_run'].conf
logging.info(cc_)
return cc_
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag)
http_task = SimpleHttpOperator(
task_id='http_task',
http_conn_id='test_http',
method='POST',
endpoint='/api/v1/function',
data=cc_,
headers={"Authorization": "Basic YWRtaW46MTIzNDU2", "Accept": "application/json, text/plain, */*"},
response_check=lambda response: True if "10000" in response.content else False,
dag=dag)
http_task.set_upstream(run_this)
但如何在simplehttpoperator中使用XCOM?你可以給任何案件代碼? – pyfroggogogo
@pyfroggogogo,我更新了一些代碼示例,嘗試這個工作 – Chengzhi
模板必須作爲字符串傳遞。您可以使用'data = json.loads('{{... | tojson}}')'在渲染後將其恢復爲字典類型。 –