2017-02-09 64 views
2

我收到以下錯誤(我認爲是因爲在我的應用程序分叉),「此結果對象不返回行」。分叉,sqlalchemy和範圍會話

Traceback 
--------- 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/dask/async.py", line 263, in execute_task 
result = _execute_task(task, data) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/dask/async.py", line 245, in _execute_task 
return func(*args2) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/smg/analytics/services/impact_analysis.py", line 140, in _do_impact_analysis_mp 
Correlation.user_id.in_(user_ids)).all()) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2241, in all 
return list(self) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 65, in instances 
fetch = cursor.fetchall() 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 752, in fetchall 
self.cursor, self.context) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1027, in _handle_dbapi_exception 
util.reraise(*exc_info) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 746, in fetchall 
l = self.process_rows(self._fetchall_impl()) 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 715, in _fetchall_impl 
self._non_result() 
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 720, in _non_result 
"This result object does not return rows. " 

我正在使用dask和它的多處理調度程序(它使用multiprocessing.Pool)。 據我瞭解(基於文檔),從範圍會話對象創建的會話(通過scoped_session()創建)是線程安全的。這是因爲他們是threadlocal。這將導致我相信,當我撥打Session()(或使用代理Session)時,我得到一個只存在的會話對象,並且只能從被調用的線程訪問。 這看起來很簡單。

我感到困惑的是,爲什麼我在分叉過程中遇到問題。據我所知,你不能 重複使用跨進程的引擎,所以我從文檔遵循基於事件的解決方案,並逐字做到了這一點:

class _DB(object): 

    _engine = None 

    @classmethod 
    def _get_engine(cls, force_new=False): 
     if cls._engine is None or force_new is True: 
      cfg = Config.get_config() 
      user = cfg['USER'] 
      host = cfg['HOST'] 
      password = cfg['PASSWORD'] 
      database = cfg['DATABASE'] 
      engine = create_engine(
       'mysql://{}:{}@{}/{}?local_infile=1&' 
       'unix_socket=/var/run/mysqld/mysqld.sock'. 
        format(user, password, host, database), 
       pool_size=5, pool_recycle=3600) 
      cls._engine = engine 
     return cls._engine 



# From the docs, handles multiprocessing 
@event.listens_for(_DB._get_engine(), "connect") 
def connect(dbapi_connection, connection_record): 
    connection_record.info['pid'] = os.getpid() 

#From the docs, handles multiprocessing 
@event.listens_for(_DB._get_engine(), "checkout") 
def checkout(dbapi_connection, connection_record, connection_proxy): 
    pid = os.getpid() 
    if connection_record.info['pid'] != pid: 
     connection_record.connection = connection_proxy.connection = None 
     raise exc.DisconnectionError(
      "Connection record belongs to pid %s, " 
      "attempting to check out in pid %s" % 
      (connection_record.info['pid'], pid) 
     ) 


# The following is how I create the scoped session object. 

Session = scoped_session(sessionmaker(
    bind=_DB._get_engine(), autocommit=False, autoflush=False)) 

Base = declarative_base() 
Base.query = Session.query_property() 

所以我的假設(基於文檔)有以下幾種:

  1. 使用從範圍的會話對象創建一個會話對象,它必須給我一個ThreadLocal會話(這在我的情況下,也只是子進程的主線程)。雖然沒有在文檔中,我想這應該適用,即使範圍會話對象是在另一個進程中創建的。

  2. ThreadLocal的會議將獲得通過發動機從池中的連接,如果連接沒有這個過程中創建它會創建一個新的(基於上述connection()checkout()實現。)

如果這兩件事情都是真的,那麼一切都應該「正常工作」(AFAICT)。但情況並非如此。

我設法通過在每個新進程中創建一個新的有限會話會話對象,並在所有後續使用會話的呼叫中使用它。

順便提一下,Base.query屬性也需要從這個新的有作用域的會話對象中更新。

我想我上面的#1假設是不正確的。任何人都可以幫助我理解爲什麼我需要在每個進程中創建一個新的範圍會話對象?

乾杯。

+0

你可以發佈一個最小的例子,包括分叉的代碼,以及完整的堆棧跟蹤?我懷疑連接池在fork之前已經連接到數據庫,導致兩個進程共享套接字。 – univerio

+0

我將添加一些示例代碼。在完成任何分支之前,池肯定已經連接,但是使用池的子進程通過在使用它之前檢查調用代碼的pid來處理,或者創建一個新進程(按照上面的「checkout」方法)。或者至少這是意圖的AIUI。 –

+0

您可以創建單節點分佈式調度程序,而不是使用'dask.multiprocessing.get'。這將從更清潔的流程中預先分流,並且通常是更清潔的體驗:http://dask.pydata.org/en/latest/scheduler-choice.html – MRocklin

回答

0

不清楚什麼時候發生fork,但最常見的問題是引擎是在fork之前創建的,它使用pool_size = 5初始化與數據庫的TCP連接,然後將其複製到新進程並導致多個進程與相同的物理套接字交互=>麻煩。

選項是:

  • 禁用池和使用按需連接:poolclass = NullPool
  • 重新創建後叉池:sqla_engine。dispose()
  • 延遲的create_engine直到叉後
+0

它肯定是在引擎創建後分叉的,但是AIUI是整個點自定義'checkout()'方法;使用一個池來處理多個進程。我懷疑這與dask推動這個過程的方式有關。 –