23

我有一個反應器,從一個RabbitMQ的經紀人獲取消息,並引發工人方法的過程池來處理這些消息,是這樣的:如何處理ProcessPool中的SQLAlchemy連接?

Reactor

這是使用python asyncioloop.run_in_executor()concurrent.futures.ProcessPoolExecutor實施。

現在我想使用SQLAlchemy訪問worker方法中的數據庫。大多數處理將是非常簡單快速的CRUD操作。

該反應器將處理每秒10-50消息在一開始,所以這是不能接受打開爲每個請求一個新的數據庫連接。相反,我想維護每個進程的一個持久連接。

我的問題是:我該怎麼做?我可以將它們存儲在全局變量中嗎? SQA連接池會爲我處理這個問題嗎?反應堆停止時如何清理?

[更新]

  • 該數據庫是MySQL搭配InnoDB的。

爲什麼選擇這種模式與進程池?

當前實現使用每個消費者在自己的線程上運行不同的模式。不知何故,這不起作用。目前已有大約200名消費者在各自的線程中運行,並且系統正在快速增長。爲了更好地擴展,這個想法是將關注點分開,並在I/O循環中使用消息並將處理委託給池。當然,整個系統的性能主要是I/O界限。但是,處理大型結果集時,CPU是個問題。

另一個原因是「易用性」。雖然消息的連接處理和消耗是異步實現的,但工作者中的代碼可以是同步且簡單的。

不久,很明顯,從工人中訪問通過持續的網絡連接遠程系統是一個問題。這就是CommunicationChannels的用途:在工作人員內部,我可以通過這些渠道向消息總線發出請求。

我的一個想法目前是處理類似的方式DB訪問:通過隊列事件循環在那裏它們被髮送到DB傳遞報表。但是,我不知道如何使用SQLAlchemy完成此操作。 入口點在哪裏? 對象在通過隊列時需要爲pickled。我如何從SQA查詢中獲得這樣的對象? 爲了不阻塞事件循環,與數據庫的通信必須異步工作。我可以使用例如aiomysql作爲SQA的數據庫驅動程序?

+0

那麼每個工人都是自己的過程?那麼不能共享連接,所以也許你應該用最大1或2個連接限制實例化每個(本地)SQA池。然後觀察,也許通過數據庫(哪個數據庫?)什麼連接正在產生/被殺死。受到嚴重的傷害 - 你不願意做的是在SQA的頂端實施你自己的天真的連接池。或者嘗試確定SQA conn是否關閉。 –

+0

@JLPeyret:我用你要求的信息更新了問題。不,我不打算實現我自己的連接池。 – roman

+0

因此,我想我記得連接不能跨進程(在OS的意義上說,與線程區分)。而且我知道關係根本就不好。你應該能夠發送「死」(字符串)sql語句,但我相信你會很難通過db conns,我想可能包括SQA結果。對我的猜測,但在一定程度上玩奇怪的SQA用法來證明它。 –

回答

6

你的每個進程池進程一個數據庫連接可以很容易滿足,如果一些小心你如何實例化session,假設您正在使用ORM的工作,在工作進程的要求。

簡單的解決辦法是讓你跨請求重用全球session

# db.py 
engine = create_engine("connection_uri", pool_size=1, max_overflow=0) 
DBSession = scoped_session(sessionmaker(bind=engine)) 

而且對工人的任務:

# task.py 
from db import engine, DBSession 
def task(): 
    DBSession.begin() # each task will get its own transaction over the global connection 
    ... 
    DBSession.query(...) 
    ... 
    DBSession.close() # cleanup on task end 

參數pool_sizemax_overflowcustomize所使用的默認QueuePool create_engine。 pool_size將確保您的進程只保留進程池中每個進程的1個連接。

如果您希望它重新連接,您可以使用DBSession.remove()這將從註冊表中刪除會話,並將使它在下一次DBSession使用時重新連接。您也可以使用recycle參數Pool在指定的時間段之後使連接重新連接。

在開發/ debbuging期間,您可以使用AssertionPool,如果從池中檢出多個連接,將會引發異常,請參閱switching pool implementations關於如何執行此操作。

+0

所以你基本上是建議我不要擔心,因爲SQA池會處理這個問題。這會很好!我將在接下來的幾天內將我們的主應用程序+200用戶和+20000行代碼移植到新的軟件架構中,並查看它是否可行。 – roman

+0

@roman祝你好運,如果你有任何問題,請不要猶豫,在這裏發表評論,如果你覺得我覆蓋了你的問題,這將是很好的標記爲接受:)。 – olokki

+0

似乎到目前爲止工作得很好! :)應該提及文檔中的這一部分,我認爲http://docs.sqlalchemy.org/en/rel_1_1/core/pooling.html?highlight=multiprocessing#using-connection-pools-with-multiprocessing。人們必須特別關心多處理。 – roman

0

@roman:你在那裏有很好的挑戰。

我已經在類似的情況是之前,所以這是我的2美分:除非只是這種消費「讀」「寫」消息,沒有做任何真正等待處理,你可以重新設計這個消費者作爲消費者/生產者將會消耗消費消息,它將處理消息並且然後將結果放到另一個隊列中,那個隊列(處理的消息對於說)可以被1..N非讀取 - 在本身的整個生命週期中打開數據庫連接的異步過程。

我可以延伸我的答案,但我不知道這種方法是否適合您的需求,如果是這樣,我可以給你更多關於擴展設計的細節。

+0

我正在考慮這樣一種方法,但我認爲這將是很難得到正確的交易處理。我想我不想嘗試構建我自己的分佈式事務管理器。 – roman

0

一個非常好的方法是使用web服務器來處理和擴展進程池。即使在默認狀態下,flask-sqlalchemy也會保留連接池,並且不會在每個請求響應週期中關閉每個連接。

asyncio執行程序可以調用url終點來執行您的功能。額外的好處是,因爲所有執行這項工作的進程都在url的後面,所以你可以在多臺機器上擴展你的工作池,通過gunicorn或其他許多方法來擴展一個簡單的wsgi服務器。再加上你獲得了所有的容錯善良。

不利的一面是,您可能會通過網絡傳遞更多信息。不過,正如你所說的那樣,問題是CPU綁定,你可能會傳遞更多的數據到數據庫。

+0

當我說CPU是一個問題時,我並不是說主要的工作負載是CPU綁定的!這不是......與上面的其他方法一樣,我在這裏看到了交易處理的嚴重問題。在業務邏輯和持久層之間建立無狀態網絡連接聽起來很可怕。 – roman