我無法正確理解如何正確打開和關閉數據庫會話,正如我通過sqlalchemy文檔理解的那樣,如果我使用scoped_session構造我的Session對象,然後使用返回的Session對象創建會話,它是線程安全的,所以基本上每個線程都會得到它自己的會話,並且不會有問題。現在下面的例子工作,我把它放在一個無限循環中,看它是否正確地關閉了會話,並且如果我正確地監視它(在mysql中執行「SHOW PROCESSLIST;」),連接只會繼續增長,它不會關閉它們,儘管我使用了session.close(),甚至在每次運行結束時刪除了scoped_session對象。我究竟做錯了什麼?我在更大的應用程序中的目標是使用所需的最少數量的數據庫連接,因爲我目前的工作實現在每個需要的方法中創建一個新的會話,並在返回之前關閉它,這似乎效率低下。SQLAlchemy在多線程應用程序中正確處理會話
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from threading import Thread
from Queue import Queue, Empty as QueueEmpty
from models import MyModel
DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname'
class MTWorker(object):
def __init__(self, worker_count=5):
self.task_queue = Queue()
self.worker_count = worker_count
self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
self.DBSession = scoped_session(
sessionmaker(
autoflush=True,
autocommit=False,
bind=self.db_engine
)
)
def _worker(self):
db_session = self.DBSession()
while True:
try:
task_id = self.task_queue.get(False)
try:
item = db_session.query(MyModel).filter(MyModel.id == task_id).one()
# do something with item
except Exception as exc:
# if an error occurrs we skip it
continue
finally:
db_session.commit()
self.task_queue.task_done()
except QueueEmpty:
db_session.close()
return
def start(self):
try:
db_session = self.DBSession()
all_items = db_session.query(MyModel).all()
for item in all_items:
self.task_queue.put(item.id)
for _i in range(self.worker_count):
t = Thread(target=self._worker)
t.start()
self.task_queue.join()
finally:
db_session.close()
self.DBSession.remove()
if __name__ == '__main__':
while True:
mt_worker = MTWorker(worker_count=50)
mt_worker.start()
謝謝你的信息,這是非常有益的確實。國王問候! – andrean 2012-03-09 09:01:15