不是你正在尋找的答案,因爲它沒有解決嘗試指示SQLAlchemy使用psycopg臨時表,並且需要 - 有點 - 手動SQL,但是:你可以從引擎訪問基礎psycopg連接與raw_connection()
,其中允許使用COPY FROM:
import io
import csv
from psycopg2 import sql
def bulk_copy(engine, table, values):
csv_file = io.StringIO()
headers = list(values[0].keys())
writer = csv.DictWriter(csv_file, headers)
writer.writerows(values)
csv_file.seek(0)
# NOTE: `format()` here is *not* `str.format()`, but
# `SQL.format()`. Never use plain string formatting.
copy_stmt = sql.SQL("COPY {} (" +
",".join(["{}"] * len(headers)) +
") FROM STDIN CSV").\
format(sql.Identifier(str(table.name)),
*(sql.Identifier(col) for col in headers))
# Fetch a raw psycopg connection from the SQLAlchemy engine
conn = engine.raw_connection()
try:
with conn.cursor() as cur:
cur.copy_expert(copy_stmt, csv_file)
except Exception as e:
conn.rollback()
raise e
else:
conn.commit()
finally:
conn.close()
然後
bulk_copy(engine, SimpleTable.__table__, values)
相比,執行INSERT語句這應該是足夠快。在這臺機器上移動600,000條記錄大約需要8秒,約13μs/記錄。您也可以使用原始連接和光標與extras軟件包。
我剛剛建議使用'SimpleTable .__ table __。insert()。values(values)',它會編譯成一個帶有多個VALUES元組的單個INSERT語句,但事實證明它實際上在我的機器上更慢。編譯本身與使用依賴於'executemany()'的方法一樣慢。 –