2012-04-25 53 views
28

我的應用程序正在使用作用域會話和SQLALchemy的聲明式樣式。這是一個web應用程序,許多數據庫插入由Celery執行,一個任務調度程序。在插入SQLAlchemy(聲明式)時處理重複的主鍵

通常情況下,決定插入的對象時,我的代碼可能會做大致如下的內容:

from schema import Session 
from schema.models import Bike 

pk = 123 # primary key 
bike = Session.query(Bike).filter_by(bike_id=pk).first() 
if not bike: # no bike in DB 
    new_bike = Bike(pk, "shiny", "bike") 
    Session.add(new_bike) 
    Session.commit() 

這裏的問題是,由於很多,這是通過異步的工人來做,有可能爲一個雖然插入Bikeid=123,而另一個正在檢查其存在,但工作中途中斷。在這種情況下,第二個worker將嘗試使用相同的主鍵插入一行,並且SQLAlchemy將引發一個IntegrityError

我不能爲我的生活找到一個很好的方式,從換出Session.commit()除了解決這個問題:

'''schema/__init__.py''' 
from sqlalchemy.orm import scoped_session, sessionmaker 
Session = scoped_session(sessionmaker()) 

def commit(ignore=False): 
    try: 
     Session.commit() 
    except IntegrityError as e: 
     reason = e.message 
     logger.warning(reason) 

     if not ignore: 
      raise e 

     if "Duplicate entry" in reason: 
      logger.info("%s already in table." % e.params[0]) 
      Session.rollback() 

然後我到處有Session.commit我現在有schema.commit(ignore=True)哪裏我不不介意該行不再被插入。

對我來說這似乎非常脆弱,因爲字符串檢查。正如一個供參考,當IntegrityError提高它看起來像這樣:

(IntegrityError) (1062, "Duplicate entry '123' for key 'PRIMARY'") 

所以當然是我被插入主鍵是像Duplicate entry is a cool thing話,我想我可能會錯過IntegrityError的這實際上不是因爲重複的主鍵。

是否有更好的方法,它保持我用乾淨的SQLAlchemy的方法(而不是開始寫出來的字符串等語句。)

Db的是MySQL的(儘管單元測試我喜歡使用SQLite,並不想用任何新方法來阻止這種能力)。

乾杯!

+3

你爲什麼不考慮使用自動遞增爲您生成主鍵?那麼你不必擔心這個問題。 還是有沒有這樣做的具體原因? – mata 2012-04-25 19:40:39

+0

有一個特定的原因(對不起,這個例子有點瑣碎)。 – Edwardr 2012-04-25 19:50:37

回答

6

您應該以相同的方式處理每個IntegrityError:回滾事務,並且可以再次嘗試。一些數據庫甚至不會讓你在IntegrityError之後做更多的事情。你也可以在表上獲得鎖,或者在數據庫允許的情況下獲得更細粒度的鎖,在兩個衝突的事務開始時。

使用with語句顯式開始一個事務,並自動提交(或回滾上的任何異常):

from schema import Session 
from schema.models import Bike 

session = Session() 
with session.begin(): 
    pk = 123 # primary key 
    bike = session.query(Bike).filter_by(bike_id=pk).first() 
    if not bike: # no bike in DB 
     new_bike = Bike(pk, "shiny", "bike") 
     session.add(new_bike) 
+0

嗨。我不打算在同一時間安排插入和檢查。問題在於該對象碰巧是由兩個獨立的進程以臨時方式創建的。沒有什麼不愉快的事情,它只是應用程序的方式(事實上對象不是自行車,它們是*次*)。然而,你說的是運行單個工人。我正在研究如何指定單個工作人員管理所有與數據庫相關的任務,這將提供我需要的同步性。從應用程序插入不是一個選項。數據庫在遠程機器上,我需要100毫秒的Web-app響應。 – Edwardr 2012-04-26 09:03:23

+0

設計幾乎總是要歸咎於這些類型的SQL問題。例如,你確定你不能讓數據庫的主鍵自動遞增,並處理偶爾的'兩行'以前的主鍵列'結果? – joeforker 2012-04-26 15:15:10

+0

[對不起,我應該補充一點,PK沒有自動增量是有原因的]我只是不確定我是否同意。數據庫由許多其他應用程序共享,包括使用有問題的表格。爲什麼在你做了一些盡職調查之後,數據庫可能會插入我的另一個進程/應用程序/人類,這是不好的設計?關鍵是你必須在你的應用中處理這個問題。我的問題很簡單,我可以看到在SQLAlchemy中處理這個問題的唯一方法是通過字符串檢查,而且它看起來並不健壯。 – Edwardr 2012-04-26 21:53:59

3

我假設你的主鍵下面是一些自然的方式,這就是爲什麼你不能依靠正常的自動增量技術。所以,讓我們說這個問題真的是你需要插入的一個獨特的列,這是更常見的。

如果您想要「嘗試插入,在故障時部分回滾」,那麼您使用SAVEPOINT,它與SQLAlchemy是begin_nested()。下一個rollback()或commit()只作用於SAVEPOINT,而不是更大範圍的事情。

但是,總體來說,這裏的模式只是一個應該避免的模式。你真正想在這裏做的是三件事之一。 1。不要運行處理需要插入的相同密鑰的併發作業。 2.以某種方式同步正在使用的併發密鑰上的作業3.使用一些通用服務來生成這種特定類型的新記錄,由作業共享(或確保它們在作業運行之前都已設置好)。

如果你考慮一下,#2在任何情況下都會發生高度的隔離。開始兩個postgres會話。第1節:

test=> create table foo(id integer primary key); 
NOTICE: CREATE TABLE/PRIMARY KEY will create implicit index "foo_pkey" for table "foo" 
CREATE TABLE 
test=> begin; 
BEGIN 
test=> insert into foo (id) values (1); 

會議2:

test=> begin; 
BEGIN 
test=> insert into foo(id) values(1); 

你將看到的是,會話2塊,與PK#1的行被鎖定。我不確定MySQL是否足夠聰明來做到這一點,但這是正確的行爲。如果OTOH試圖插入不同的PK:

^CCancel request sent 
ERROR: canceling statement due to user request 
test=> rollback; 
ROLLBACK 
test=> begin; 
BEGIN 
test=> insert into foo(id) values(2); 
INSERT 0 1 
test=> \q 

它進行得很好,沒有阻塞。

問題是如果你正在做這種PK/UQ競爭,你的芹菜任務將會自行序列化無論如何,或者至少,他們應該是。

23

如果您使用session.merge(bike)而不是session.add(bike),那麼您將不會生成主鍵錯誤。 bike將根據需要進行檢索和更新或創建。

+6

如果你使用合併,你仍然可以得到完整性錯誤,如果你在不同的會話上同時進行兩次合併。 – Sjoerd 2015-09-17 09:57:38

+0

當會話適合內存時,這個答案很好,但對於較大的查詢不太好。所以如果你想添加比內存更多的數據,你不能只創建一堆會話併合並它們,對吧? – elplatt 2016-05-23 18:39:29

2

而不是session.add(obj)你需要使用下面提到的代碼,這將是更乾淨,你不需要像你所說的使用自定義提交功能。然而,這將忽略衝突,不僅對重複鍵而且對其他人也是如此。

的mysql:

self.session.execute(insert(self.table, values=values, prefixes=['IGNORE'])) 

sqlite的

self.session.execute(insert(self.table, values=values, prefixes=['OR IGNORE']))