我收到以下錯誤(我認爲是因爲在我的應用程序分叉),「此結果對象不返回行」。分叉,sqlalchemy和範圍會話
Traceback
---------
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/dask/async.py", line 263, in execute_task
result = _execute_task(task, data)
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/dask/async.py", line 245, in _execute_task
return func(*args2)
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/smg/analytics/services/impact_analysis.py", line 140, in _do_impact_analysis_mp
Correlation.user_id.in_(user_ids)).all())
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2241, in all
return list(self)
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 65, in instances
fetch = cursor.fetchall()
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 752, in fetchall
self.cursor, self.context)
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1027, in _handle_dbapi_exception
util.reraise(*exc_info)
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 746, in fetchall
l = self.process_rows(self._fetchall_impl())
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 715, in _fetchall_impl
self._non_result()
File "/opt/miniconda/envs/analytical-engine/lib/python2.7/site-packages/sqlalchemy/engine/result.py", line 720, in _non_result
"This result object does not return rows. "
我正在使用dask和它的多處理調度程序(它使用multiprocessing.Pool
)。 據我瞭解(基於文檔),從範圍會話對象創建的會話(通過scoped_session()
創建)是線程安全的。這是因爲他們是threadlocal。這將導致我相信,當我撥打Session()
(或使用代理Session
)時,我得到一個只存在的會話對象,並且只能從被調用的線程訪問。 這看起來很簡單。
我感到困惑的是,爲什麼我在分叉過程中遇到問題。據我所知,你不能 重複使用跨進程的引擎,所以我從文檔遵循基於事件的解決方案,並逐字做到了這一點:
class _DB(object):
_engine = None
@classmethod
def _get_engine(cls, force_new=False):
if cls._engine is None or force_new is True:
cfg = Config.get_config()
user = cfg['USER']
host = cfg['HOST']
password = cfg['PASSWORD']
database = cfg['DATABASE']
engine = create_engine(
'mysql://{}:{}@{}/{}?local_infile=1&'
'unix_socket=/var/run/mysqld/mysqld.sock'.
format(user, password, host, database),
pool_size=5, pool_recycle=3600)
cls._engine = engine
return cls._engine
# From the docs, handles multiprocessing
@event.listens_for(_DB._get_engine(), "connect")
def connect(dbapi_connection, connection_record):
connection_record.info['pid'] = os.getpid()
#From the docs, handles multiprocessing
@event.listens_for(_DB._get_engine(), "checkout")
def checkout(dbapi_connection, connection_record, connection_proxy):
pid = os.getpid()
if connection_record.info['pid'] != pid:
connection_record.connection = connection_proxy.connection = None
raise exc.DisconnectionError(
"Connection record belongs to pid %s, "
"attempting to check out in pid %s" %
(connection_record.info['pid'], pid)
)
# The following is how I create the scoped session object.
Session = scoped_session(sessionmaker(
bind=_DB._get_engine(), autocommit=False, autoflush=False))
Base = declarative_base()
Base.query = Session.query_property()
所以我的假設(基於文檔)有以下幾種:
使用從範圍的會話對象創建一個會話對象,它必須給我一個ThreadLocal會話(這在我的情況下,也只是子進程的主線程)。雖然沒有在文檔中,我想這應該適用,即使範圍會話對象是在另一個進程中創建的。
ThreadLocal的會議將獲得通過發動機從池中的連接,如果連接沒有這個過程中創建它會創建一個新的(基於上述
connection()
和checkout()
實現。)
如果這兩件事情都是真的,那麼一切都應該「正常工作」(AFAICT)。但情況並非如此。
我設法通過在每個新進程中創建一個新的有限會話會話對象,並在所有後續使用會話的呼叫中使用它。
順便提一下,Base.query
屬性也需要從這個新的有作用域的會話對象中更新。
我想我上面的#1假設是不正確的。任何人都可以幫助我理解爲什麼我需要在每個進程中創建一個新的範圍會話對象?
乾杯。
你可以發佈一個最小的例子,包括分叉的代碼,以及完整的堆棧跟蹤?我懷疑連接池在fork之前已經連接到數據庫,導致兩個進程共享套接字。 – univerio
我將添加一些示例代碼。在完成任何分支之前,池肯定已經連接,但是使用池的子進程通過在使用它之前檢查調用代碼的pid來處理,或者創建一個新進程(按照上面的「checkout」方法)。或者至少這是意圖的AIUI。 –
您可以創建單節點分佈式調度程序,而不是使用'dask.multiprocessing.get'。這將從更清潔的流程中預先分流,並且通常是更清潔的體驗:http://dask.pydata.org/en/latest/scheduler-choice.html – MRocklin