2016-09-14 57 views
0

我用這種邏輯編寫了一個腳本,以便在生成PostgreSQL表時將許多記錄插入到PostgreSQL表中。SQLAlchemy ThreadPoolExecutor「客戶端太多」

#!/usr/bin/env python3 
import asyncio 
from concurrent.futures import ProcessPoolExecutor as pool 
from functools import partial 

import sqlalchemy as sa 
from sqlalchemy.ext.declarative import declarative_base 


metadata = sa.MetaData(schema='stackoverflow') 
Base = declarative_base(metadata=metadata) 


class Example(Base): 
    __tablename__ = 'example' 
    pk = sa.Column(sa.Integer, primary_key=True) 
    text = sa.Column(sa.Text) 


sa.event.listen(Base.metadata, 'before_create', 
    sa.DDL('CREATE SCHEMA IF NOT EXISTS stackoverflow')) 

engine = sa.create_engine(
    'postgresql+psycopg2://postgres:[email protected]:5432/stackoverflow' 
) 
Base.metadata.create_all(engine) 
session = sa.orm.sessionmaker(bind=engine, autocommit=True)() 


def task(value): 
    engine.dispose() 
    with session.begin(): 
     session.add(Example(text=value)) 


async def infinite_task(loop): 
    spawn_task = partial(loop.run_in_executor, None, task) 
    while True: 
     await asyncio.wait([spawn_task(value) for value in range(10000)]) 


def main(): 
    loop = asyncio.get_event_loop() 
    with pool() as executor: 
     loop.set_default_executor(executor) 
     asyncio.ensure_future(infinite_task(loop)) 
     loop.run_forever() 
     loop.close() 


if __name__ == '__main__': 
    main() 

此代碼工作得很好,創造儘可能多的進程池,因爲我有個CPU核心,並愉快地沿着隆隆永遠。我想看看線程如何與進程進行比較,但我無法得到一個有效的示例。下面是我所做的更改:

from concurrent.futures import ThreadPoolExecutor as pool 

session_maker = sa.orm.sessionmaker(bind=engine, autocommit=True) 
Session = sa.orm.scoped_session(session_maker) 


def task(value): 
    engine.dispose() 
    # create new session per thread 
    session = Session() 
    with session.begin(): 
     session.add(Example(text=value)) 
    # remove session once the work is done 
    Session.remove() 

了一段時間的「太多的客戶」異常洪水之前,此版本的運行:

sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) FATAL: sorry, too many clients already 

我缺少什麼?

回答

2

事實證明,問題是engine.dispose(),用邁克·拜耳(zzzeek)的話說:「PG離開的時候,要把垃圾收集器打開。

來源:https://groups.google.com/forum/#!topic/sqlalchemy/zhjCBNebnDY

所以更新task功能如下:

def task(value): 
    # create new session per thread 
    session = Session() 
    with session.begin(): 
     session.add(Example(text=value)) 
    # remove session object once the work is done 
    session.remove() 
+0

@txomon:它是'Session.remove()',而不是'session.remove()'(嘗試編輯但被拒絕)。原因在於''sessionmaker'工廠實例具有'remove()'方法(您正在調用工廠將當前會話從池中取消引用;會話不會自行移除)。請參閱:http://docs.sqlalchemy.org/en/latest/orm/contextual.html – cowbert

0

它看起來像你打開了很多新的連接,而無需關閉它們,嘗試添加後engine.dispose():

from concurrent.futures import ThreadPoolExecutor as pool 

session_maker = sa.orm.sessionmaker(bind=engine, autocommit=True) 
Session = sa.orm.scoped_session(session_maker) 


def task(value): 
    engine.dispose() 
    # create new session per thread 
    session = Session() 
    with session.begin(): 
     session.add(Example(text=value)) 
    # remove session once the work is done 
    Session.remove() 
    engine.dispose() 

記住一個新的連接的成本,所以最好你每個進程/線程應該有一個連接,但我不確定ThreadPoolExecutor是如何工作的,並且在線程執行完成時可能連接沒有關閉。

+0

不幸的是engine.dispose的'額外調用()'沒有任何區別。 – user1475412

+0

@ user1475412,你的初始版本的代碼可以使用pool(max_worker = NUM​​BER_OF_CORES)作爲執行器:'?正如我從文檔中看到的那樣,「如果max_workers是None或沒有給出,它將默認爲機器上的處理器數量乘以5」,所以對於8 HT核心系統,您有40個線程,但默認連接數限制在PostgreSQL中是100. – icuken

+0

我用'pool(max_workers = 5)'重新執行了代碼'但是遇到了同樣的問題。 – user1475412