2017-04-10 69 views
2

我想將大量條目(〜600k)上傳到PostgreSQL數據庫中的一個簡單表中,每個條目有一個外鍵,一個時間戳和3個浮點數。但是,每個條目需要60毫秒才能執行描述爲here的核心批量插入,因此整個執行需要10小時。我已經發現,這是一個性能問題executemany()方法,但是它已經通過中的execute_values()方法得到解決。如何在sqlalchemy中使用psycopg2.extras?

我運行的代碼如下:

#build a huge list of dicts, one dict for each entry 
engine.execute(SimpleTable.__table__.insert(), 
       values) # around 600k dicts in a list 

我看,這是一個普遍的問題,但我還沒有設法找到在SQLAlchemy中本身就是一個解決方案。有什麼辦法可以告訴sqlalchemy在某些場合撥打execute_values()?有沒有其他的方式來實現巨大的插入,而不需要自己構建SQL語句?

感謝您的幫助!

+1

我剛剛建議使用'SimpleTable .__ table __。insert()。values(values)',它會編譯成一個帶有多個VALUES元組的單個INSERT語句,但事實證明它實際上在我的機器上更慢。編譯本身與使用依賴於'executemany()'的方法一樣慢。 –

回答

1

不是你正在尋找的答案,因爲它沒有解決嘗試指示SQLAlchemy使用psycopg臨時表,並且需要 - 有點 - 手動SQL,但是:你可以從引擎訪問基礎psycopg連接與raw_connection(),其中允許使用COPY FROM

import io 
import csv 
from psycopg2 import sql 

def bulk_copy(engine, table, values): 
    csv_file = io.StringIO() 
    headers = list(values[0].keys()) 
    writer = csv.DictWriter(csv_file, headers) 
    writer.writerows(values) 

    csv_file.seek(0) 

    # NOTE: `format()` here is *not* `str.format()`, but 
    # `SQL.format()`. Never use plain string formatting. 
    copy_stmt = sql.SQL("COPY {} (" + 
         ",".join(["{}"] * len(headers)) + 
         ") FROM STDIN CSV").\ 
     format(sql.Identifier(str(table.name)), 
       *(sql.Identifier(col) for col in headers)) 

    # Fetch a raw psycopg connection from the SQLAlchemy engine 
    conn = engine.raw_connection() 
    try: 
     with conn.cursor() as cur: 
      cur.copy_expert(copy_stmt, csv_file) 

    except Exception as e: 
     conn.rollback() 
     raise e 

    else: 
     conn.commit() 

    finally: 
     conn.close() 

然後

bulk_copy(engine, SimpleTable.__table__, values) 

相比,執行INSERT語句這應該是足夠快。在這臺機器上移動600,000條記錄大約需要8秒,約13μs/記錄。您也可以使用原始連接和光標與extras軟件包。