2014-11-14 110 views
0

我試圖使用multiprocessing模塊更新數據庫(異步方式)上的一行。我的代碼有一個簡單的函數create_member,它在表上插入一些數據,然後創建一個可能會改變這些數據的進程。問題是,傳遞給async_create_member會話關閉數據庫連接,以及接下來的徵用我得到psycopg的錯誤:Python SQLAlchemy更新Postgres記錄

(Interface Error) connection already closed 

下面的代碼:

def create_member(self, data): 
    member = self.entity(**data) 
    self.session.add(member) 
    for name in data: 
     setattr(member, name, data[name]) 
    self.session.commit() 
    self.session.close() 
    if self.index.is_indexable: 
     Process(target=self.async_create_member, 
      args=(data, self.session)).start() 
    return member 

def async_create_member(self, data, session): 
    ok, data = self.index.create(data) 
    if ok: 

     datacopy = data.copy() 
     data.clear() 
     data['document'] = datacopy['document'] 
     data['dt_idx'] = datacopy['dt_idx'] 
     stmt = update(self.entity.__table__).where(
      self.entity.__table__.c.id_doc == datacopy['id_doc'])\ 
      .values(**data) 

     session.begin() 
     session.execute(stmt) 
     session.commit() 
     session.close() 

我可能通過創建一個解決這個問題在async_create_member新引黃,但這留下的Postgres太多idle交易:

engine = create_new_engine() 
conn = engine.connect() 
conn.execute(stmt) 
conn.close() 

什麼建議立即進行刪除我現在做什麼?有沒有辦法解決第一個代碼?或者我應該不斷創建與create_new_engine函數的新連接?我應該使用線程還是進程?

+0

我相信如果你刪除'self.session.close()'在create_member功能它應該工作 – user2097159 2014-11-14 18:52:14

+0

已經嘗試過。沒有成功 – user1538560 2014-11-14 19:33:08

+0

嗯,這很奇怪,你應該得到這個異常的唯一原因是因爲你關閉了與服務器的連接。我也認爲你不需要begin()。我假設你在session.execute(stmt)行中出現這個錯誤? – user2097159 2014-11-14 19:38:16

回答

-1

您不能在線程或進程之間重用會話。 Sessions aren't thread safe,並且會話底層的連接不會在進程間乾淨地繼承。如果沒有信息,您收到的錯誤消息是準確的:如果嘗試在跨進程邊界繼承數據庫後使用它,則數據庫連接確實是關閉的。

在大多數情況下,您應該爲multiprocessing設置中的每個進程創建一個會話。

如果您的問題符合下列條件:

  • 你正在做大量的CPU密集型處理的每個對象
  • 數據庫的寫入操作相比相對輕巧
  • 要使用大量的(我在8臺核心機器上執行此操作)

創建一個擁有會話的單個寫入程序進程並將該對象傳遞給該程序可能是值得的處理。下面是它通常對我的作品(注:並不意味着是可運行的代碼):

import multiprocessing 
from your_database_layer import create_new_session, WhateverType 

work = multiprocessing.JoinableQueue() 

def writer(commit_every = 50): 
    global work 
    session = create_new_session() 
    counter = 0 

    while True: 
     item = work.get() 
     if item is None: 
      break 

     session.add(item) 
     counter += 1 
     if counter % commit_every == 0: 
      session.commit() 

     work.task_done() 

    # Last DB writes 
    session.commit() 

    # Mark the final None in the queue as complete 
    work.task_done() 
    return 


def very_expensive_object_creation(data): 
    global work 
    very_expensive_object = WhateverType(**data) 
    # Perform lots of computation 
    work.put(very_expensive_object) 
    return 


def main(): 
    writer_process = multiprocessing.Process(target=writer) 
    writer_process.start() 

    # Create your pool that will feed the queue here, i.e. 
    workers = multiprocessing.Pool() 
    # Dispatch lots of work to very_expensive_object_creation in parallel here 
    workers.map(very_expensive_object_creation, some_iterable_source_here) 
    # --or-- in whatever other way floats your boat, such as 
    workers.apply_async(very_expensive_object_creation, args=(some_data_1,)) 
    workers.apply_async(very_expensive_object_creation, args=(some_data_2,)) 
    # etc. 

    # Signal that we won't dispatch any more work 
    workers.close() 

    # Wait for the creation work to be done 
    workers.join() 

    # Trigger the exit condition for the writer 
    work.put(None) 

    # Wait for the queue to be emptied 
    work.join() 

    return 
+0

有一個工作者公共變量和模塊級別可能導致內存泄漏,並且您正在使用它來標記'task_done()'可能卡住查詢。我不知道這是一個可以接受的答案 – 2017-08-16 09:19:28