2017-08-11 108 views
1

我正在使用Airflow 1.8.1,我想推送來自PostgreOperator的SQL請求的結果。氣流:如何從PostgreOperator推送xcom值?

這是我的任務:

check_task = PostgresOperator(
    task_id='check_task', 
    postgres_conn_id='conx', 
    sql="check_task.sql", 
    xcom_push=True, 
    dag=dag) 

def py_is_first_execution(**kwargs): 
    value = kwargs['ti'].xcom_pull(task_ids='check_task') 
    print 'count ----> ', value 
    if value == 0: 
     return 'next_task' 
    else: 
     return 'end-flow' 

check_branch = BranchPythonOperator(
    task_id='is-first-execution', 
    python_callable=py_is_first_execution, 
    provide_context=True, 
    dag=dag) 

,這裏是我的SQL腳本:

select count(1) from table 

當我從check_task檢查XCOM值它檢索none值。

回答

0

最後,我在$AIRFLOW_HOME/plugins下的插件管理器中創建了一個新的傳感器ExecuteSqlOperator

我以CheckOperator爲例,我修改了返回值:該運算符的基本運行正好與我所需要的相反。

這裏的默認ExecuteSqlOperator的: CheckOperator

,這裏是我的自定義SqlSensorReverseSqlSensor

class SqlExecuteOperator(BaseOperator): 
""" 
Performs checks against a db. The ``CheckOperator`` expects 
a sql query that will return a single row. 

Note that this is an abstract class and get_db_hook 
needs to be defined. Whereas a get_db_hook is hook that gets a 
single record from an external source. 
:param sql: the sql to be executed 
:type sql: string 
""" 

template_fields = ('sql',) 
template_ext = ('.hql', '.sql',) 
ui_color = '#fff7e6' 

@apply_defaults 
def __init__(
     self, sql, 
     conn_id=None, 
     *args, **kwargs): 
    super(SqlExecuteOperator, self).__init__(*args, **kwargs) 
    self.conn_id = conn_id 
    self.sql = sql 

def execute(self, context=None): 
    logging.info('Executing SQL statement: ' + self.sql) 
    records = self.get_db_hook().get_first(self.sql) 
    logging.info("Record: " + str(records)) 
    records_int = int(records[0]) 
    print (records_int) 
    return records_int 

def get_db_hook(self): 
    return BaseHook.get_hook(conn_id=self.conn_id) 
1

如果我是正確的,當查詢返回一個值時,氣流會自動推送到xcom。但是,當您查看postgresoperator的代碼時,您會看到它具有調用PostgresHook的run方法(dbapi_hook的擴展)的execute方法。兩種方法都不返回任何東西,因此它不會將任何內容推送到xcom。 我們做了什麼來解決這個問題是創建一個CustomPostgresSelectOperator,PostgresOperator的副本,但不是'hook.run(..)'做'返回hook.get_records(..)'。

希望能幫到你。