我有一個反應器,從一個RabbitMQ的經紀人獲取消息,並引發工人方法的過程池來處理這些消息,是這樣的:如何處理ProcessPool中的SQLAlchemy連接?
這是使用python asyncio
,loop.run_in_executor()
和concurrent.futures.ProcessPoolExecutor
實施。
現在我想使用SQLAlchemy訪問worker方法中的數據庫。大多數處理將是非常簡單快速的CRUD操作。
該反應器將處理每秒10-50消息在一開始,所以這是不能接受打開爲每個請求一個新的數據庫連接。相反,我想維護每個進程的一個持久連接。
我的問題是:我該怎麼做?我可以將它們存儲在全局變量中嗎? SQA連接池會爲我處理這個問題嗎?反應堆停止時如何清理?
[更新]
- 該數據庫是MySQL搭配InnoDB的。
爲什麼選擇這種模式與進程池?
當前實現使用每個消費者在自己的線程上運行不同的模式。不知何故,這不起作用。目前已有大約200名消費者在各自的線程中運行,並且系統正在快速增長。爲了更好地擴展,這個想法是將關注點分開,並在I/O循環中使用消息並將處理委託給池。當然,整個系統的性能主要是I/O界限。但是,處理大型結果集時,CPU是個問題。
另一個原因是「易用性」。雖然消息的連接處理和消耗是異步實現的,但工作者中的代碼可以是同步且簡單的。
不久,很明顯,從工人中訪問通過持續的網絡連接遠程系統是一個問題。這就是CommunicationChannels的用途:在工作人員內部,我可以通過這些渠道向消息總線發出請求。
我的一個想法目前是處理類似的方式DB訪問:通過隊列事件循環在那裏它們被髮送到DB傳遞報表。但是,我不知道如何使用SQLAlchemy完成此操作。 入口點在哪裏? 對象在通過隊列時需要爲pickled
。我如何從SQA查詢中獲得這樣的對象? 爲了不阻塞事件循環,與數據庫的通信必須異步工作。我可以使用例如aiomysql作爲SQA的數據庫驅動程序?
那麼每個工人都是自己的過程?那麼不能共享連接,所以也許你應該用最大1或2個連接限制實例化每個(本地)SQA池。然後觀察,也許通過數據庫(哪個數據庫?)什麼連接正在產生/被殺死。受到嚴重的傷害 - 你不願意做的是在SQA的頂端實施你自己的天真的連接池。或者嘗試確定SQA conn是否關閉。 –
@JLPeyret:我用你要求的信息更新了問題。不,我不打算實現我自己的連接池。 – roman
因此,我想我記得連接不能跨進程(在OS的意義上說,與線程區分)。而且我知道關係根本就不好。你應該能夠發送「死」(字符串)sql語句,但我相信你會很難通過db conns,我想可能包括SQA結果。對我的猜測,但在一定程度上玩奇怪的SQA用法來證明它。 –