2017-06-21 61 views
0

我正在使用SQLAlchemy和multiprocessing。我也使用scoped_session sinse它避免共享相同的會話,但我發現一個錯誤和他們的解決方案,但我不明白爲什麼它會發生。scoped_session中的進程邊界

下面你可以看到我的代碼:

db.py

engine = create_engine(connection_string) 

Session = sessionmaker(bind=engine) 
DBSession = scoped_session(Session) 

script.py

from multiprocessing import Pool, current_process 
from db import DBSession 

def process_feed(test): 
    session = DBSession() 
    print(current_process().name, session) 

def run(): 
    session = DBSession() 
    pool = Pool() 
    print(current_process().name, session) 
    pool.map_async(process_feed, [1, 2]).get() 

if __name__ == "__main__": 
    run() 

當我運行script.py輸出是:

MainProcess <sqlalchemy.orm.session.Session object at 0xb707b14c> 
ForkPoolWorker-1 <sqlalchemy.orm.session.Session object at 0xb707b14c> 
ForkPoolWorker-2 <sqlalchemy.orm.session.Session object at 0xb707b14c> 

注意,會話對象是在主過程相同0xb707b14c和工人(子進程)

,但如果我改變的前兩行的順序運行():

def run(): 
    pool = Pool() # <--- Now pool is instanced in the first line 
    session = DBSession() # <--- Now session is instanced in the second line 
    print(current_process().name, session) 
    pool.map_async(process_feed, [1, 2]).get() 

而我跑script.py再次輸出:

MainProcess <sqlalchemy.orm.session.Session object at 0xb66907cc> 
ForkPoolWorker-1 <sqlalchemy.orm.session.Session object at 0xb669046c> 
ForkPoolWorker-2 <sqlalchemy.orm.session.Session object at 0xb66905ec> 

現在會話實例是不同的。

回答

1

要理解爲什麼發生這種情況,您需要了解scoped_sessionPool實際上做了什麼。 scoped_session保持會話的註冊表,以便發生以下情況

  • 你第一次叫DBSession,它在註冊表中爲您創建一個Session對象
  • 隨後,如有必要條件滿足(即同一個線程,會話尚未關閉),它不會創建一個新的Session對象,而是返回你以前創建的Session對象返回

當你創建一個Pool,它會在工人方法。 (請注意,啓動__init__中的工作進程沒有任何根本性的要求,一個同等有效的實現可能會等到工人首次需要啓動之前,這會在您的示例中表現出不同的行爲)。發生這種情況時(在Unix上),父級進程爲每個工作進程分叉本身,這涉及操作系統將當前正在運行的進程的內存複製到一個新進程中,因此,您將在相同的位置完全相同地獲得完全相同的對象。

把他們倆撮合在一起,在第一個例子中,你正在創建一個Session分叉之前,這將會創建Pool的過程中被複制到所有的工作進程,導致在相同的身份,而在第二個例子中,你耽誤創建Session對象,直到工作進程啓動,導致不同的身份。

要注意的是,雖然Session對象共享相同的id,他們相同的對象,在這個意義上,如果您要更改父進程的Session話,他們將不會在反映這一點很重要子進程。由於分叉,它們恰巧都共享相同的內存地址。 但是,操作系統級別的資源(如連接)共享爲,所以如果您在Pool()之前在session上運行了查詢,則會在連接池中爲您創建連接,並隨後分入子進程。如果您嘗試在子進程中執行查詢,則會遇到奇怪的錯誤,因爲您的進程正在通過同一確切連接互相扯皮!

以上是對Windows的爭論,因爲Windows沒有fork()

+0

謝謝!你的解釋非常清楚。但我認爲fork是在這裏做的:'pool.map_async(process_feed,[1,2])。get()' –

+0

@ Overflow012是的,這是一個同樣有效的假設,因爲文檔沒有規定這種或那種方式,但是,作爲實現細節,它在'Pool .__ init__'中發生[https://github.com/python/cpython/blob/5affd23e6f42125998724787025080a24839266e/Lib/multiprocessing/pool.py#L174]。 – univerio