2009-08-25 88 views
8

我正在使用SQLAlchemy和Postgres後端來執行批量插入或更新。爲了提高性能,我試圖每千行左右只提交一次:如何有效地使用SQLAlchemy進行批量插入或更新?

trans = engine.begin() 
    for i, rec in enumerate(records): 
    if i % 1000 == 0: 
     trans.commit() 
     trans = engine.begin() 
    try: 
     inserter.execute(...) 
    except sa.exceptions.SQLError: 
     my_table.update(...).execute() 
trans.commit() 

但是,這是行不通的。看起來,當INSERT失敗時,它使事情處於一種奇怪的狀態,阻止UPDATE發生。它會自動回滾事務嗎?如果是這樣,可以停止嗎?我不希望我的整個事務在發生問題時回滾,這就是爲什麼我試圖首先捕獲異常。我得到的錯誤信息是「sqlalchemy.exc.InternalError:(InternalError)當前事務被中止,命令被忽略,直到事務塊結束」,並且它發生在update()。execute() )電話。

回答

5

您遇到了一些奇怪的Postgresql特定行爲:如果在事務中發生錯誤,它會強制整個事務回滾。我認爲這是一個Postgres設計錯誤;在某些情況下,需要使用相當多的SQL扭曲。

一種解決方法是首先執行UPDATE。通過查看cursor.rowcount來檢測它是否實際修改了一行;如果它沒有修改任何行,它不存在,INSERT也是如此。 (這將是,如果你更新的頻率比你插入當然,速度更快。)

另一種解決方法是使用保存點:

SAVEPOINT a; 
INSERT INTO ....; 
-- on error: 
ROLLBACK TO SAVEPOINT a; 
UPDATE ...; 
-- on success: 
RELEASE SAVEPOINT a; 

這對生產質量的代碼一個嚴重的問題:你必須準確檢測錯誤。據推測,你期望能夠進行獨特的約束檢查,但是你可能會遇到意想不到的事情,並且可能幾乎不可能從可預料的錯誤中可靠地區分出預期的錯誤。如果這個錯誤條件不正確,它會導致模糊的問題,沒有任何東西會被更新或插入,並且不會出現錯誤。對此非常小心。您可以通過查看Postgresql的錯誤代碼來確定錯誤類型的範圍,以確保它是您所期望的錯誤類型,但潛在的問題仍然存在。

最後,如果您真的想要批量插入或更新,您實際上希望在幾個命令中執行其中的很多命令,而不是每個命令一個項目。這需要更復雜的SQL:SELECT嵌套在INSERT中,過濾掉要插入和更新的正確項目。

+1

「如果在事務中發生錯誤,它會強制整個事務回滾,我認爲這是Postgres設計錯誤。」 - 這不是交易點嗎?來自[Wikipedia](http://en.wikipedia。org/wiki/Database_transaction):「交易提供了一個」全有或全無「的主張,指出在數據庫中執行的每個工作單元必須完整或完全沒有任何影響。」 – spiffytech 2014-01-11 21:47:03

+0

@Spiffytech回覆良好。這實際上使我LOL。 – 2014-02-02 16:47:33

4

這個錯誤來自於PostgreSQL。如果一個命令創建錯誤,PostgreSQL不允許您在同一事務中執行命令。要解決這個問題,你可以通過conn.begin_nested()使用嵌套事務(使用SQL保存點實現)。繼承人可能會發揮作用。我讓代碼使用顯式連接,將分塊部分分解出來,並使代碼使用上下文管理器正確管理事務。

from itertools import chain, islice 
def chunked(seq, chunksize): 
    """Yields items from an iterator in chunks.""" 
    it = iter(seq) 
    while True: 
     yield chain([it.next()], islice(it, chunksize-1)) 

conn = engine.commit() 
for chunk in chunked(records, 1000): 
    with conn.begin(): 
     for rec in chunk: 
      try: 
       with conn.begin_nested(): 
        conn.execute(inserter, ...) 
      except sa.exceptions.SQLError: 
       conn.execute(my_table.update(...)) 

儘管由於嵌套的事務開銷,這仍然不會有很好的性能。如果您希望獲得更好的性能,請嘗試使用select查詢事先檢測哪些行將創建錯誤,並使用executemany支持(如果所有插入使用相同的列,則可以執行一個dicts列表)。如果你需要處理併發更新,你仍然需要通過重試或逐個回退來進行錯誤處理。

相關問題