2017-01-26 274 views
1

我目前正在使用SQLAlchemy(在GAE上連接到Google的雲MySQL)編寫Web應用程序(Flask)並需要對錶進行批量更新。總之,進行了一些計算,導致需要在1000個對象上更新單個值。目前我正在做一切交易,但最後還是要花費很多時間。SQLAlchemy批量更新策略

該表的索引號爲id,這些都是在單個事務中執行的。所以我相信我避免了通常的錯誤,但仍然非常緩慢。

INFO  2017-01-26 00:45:46,412 log.py:109] UPDATE wallet SET balance=%(balance)s WHERE wallet.id = %(wallet_id)s 
2017-01-26 00:45:46,418 INFO sqlalchemy.engine.base.Engine ({'wallet_id': u'3c291a05-e2ed-11e6-9b55-19626d8c7624', 'balance': 1.8711760000000002}, {'wallet_id': u'3c352035-e2ed-11e6-a64c-19626d8c7624', 'balance': 1.5875759999999999}, {'wallet_id': u'3c52c047-e2ed-11e6-a903-19626d8c7624', 'balance': 1.441656} 

從我的理解是沒有辦法做到在SQL實際上是一個批量更新,以上聲明最終被多個UPDATE語句被髮送到服務器。

我試過使用Session.bulk_update_mappings(),但似乎並沒有真正做任何事情:(不知道爲什麼,但更新從未真正發生過,我看不到任何實際使用的方法的實例(包括在性能套件)所以不知道它是否打算使用。

One technique I've seen discussed正在做一個批量插入到另一個表,然後做一個UPDATE JOIN。我已經給它一個測試,就像下面,它似乎是明顯更快。

wallets = db_session.query(Wallet).all() 
ledgers = [ Ledger(id=w.id, amount=w._balance) for w in wallets ] 
db_session.bulk_save_objects(ledgers) 
db_session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount') 
db_session.execute('TRUNCATE ledger') 

但現在的問題是如何構建我的代碼。我使用ORM和我需要以某種方式不「骯髒」原始Wallet對象,以便他們不會以舊的方式承諾。我可以創建這些Ledger對象,並保留它們的列表,然後在批量操作結束時手動插入它們。但是,這幾乎聞起來像我複製ORM機制的一些工作。

有沒有更聰明的方法來做到這一點?到目前爲止,我的大腦正在下降,是這樣的:正如我所說的

class Wallet(Base): 
    ... 
    _balance = Column(Float) 
    ... 

@property 
def balance(self): 
    # first check if we have a ledger of the same id 
    # and return the amount in that, otherwise... 
    return self._balance 

@balance.setter 
def balance(self, amount): 
    l = Ledger(id=self.id, amount=amount) 
    # add l to a list somewhere then process later 

# At the end of the transaction, do a bulk insert of Ledgers 
# and then do an UPDATE JOIN and TRUNCATE 

,這一切似乎是對抗我(可能)擁有的工具。有沒有更好的方式來處理這個問題?我可以利用ORM機制來做到這一點嗎?或者是否有更好的方式來進行批量更新?

編輯:或者是否有事情和會議巧妙的事情?也許before_flush?

編輯2:所以我試圖進軍事件機器,現在有這樣的:

@event.listens_for(SignallingSession, 'before_flush') 
def before_flush(session, flush_context, instances): 
    ledgers = [] 

    if session.dirty: 
     for elem in session.dirty: 
      if (session.is_modified(elem, include_collections=False)): 
       if isinstance(elem, Wallet): 
        session.expunge(elem) 
        ledgers.append(Ledger(id=elem.id, amount=elem.balance)) 

    if ledgers: 
     session.bulk_save_objects(ledgers) 
     session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount') 
     session.execute('TRUNCATE ledger') 

這似乎很哈克和邪惡的我,但似乎工作確定。任何陷阱或更好的方法?

-Matt

回答

2

你實質上是在繞過ORM來優化性能。因此,不要驚訝於「複製ORM正在做的工作」,因爲這正是您需要做的。

除非你有很多地方需要做這樣的批量更新,否則我會建議不要使用神奇事件方法;簡單地寫明確的查詢要簡單得多。

我的建議是使用SQLAlchemy的核心,而不是ORM做更新做:

ledger = Table("ledger", db.metadata, 
    Column("wallet_id", Integer, primary_key=True), 
    Column("new_balance", Float), 
    prefixes=["TEMPORARY"], 
) 


wallets = db_session.query(Wallet).all() 

# figure out new balances 
balance_map = {} 
for w in wallets: 
    balance_map[w.id] = calculate_new_balance(w) 

# create temp table with balances we need to update 
ledger.create(bind=db.session.get_bind()) 

# insert update data 
db.session.execute(ledger.insert().values([{"wallet_id": k, "new_balance": v} 
              for k, v in balance_map.items()]) 

# perform update 
db.session.execute(Wallet.__table__ 
         .update() 
         .values(balance=ledger.c.new_balance) 
         .where(Wallet.__table__.c.id == ledger.c.wallet_id)) 

# drop temp table 
ledger.drop(bind=db.session.get_bind()) 

# commit changes 
db.session.commit() 
+0

是的,我認爲我在這裏對ORM的攻擊太多了。問題是我已經在大多數情況下使用了ORM,所以需要進行相當多的重寫才能以這種方式獲得它,以便我可以完成上面的表單。在我的用例'calculate_new_balance()'中,將取決於以前計算的值(這個用例通過網絡傳播資金),所以我將不得不最終同時查看模型和分類帳以確定具有正確的價值。但是,謝謝,這無疑給我留下了深刻的印象! –

0

一般來說是架構設計窮人需要頻繁更新數千行。這且不說...

計劃A:編寫生成

START TRANSACTION; 
UPDATE wallet SET balance = ... WHERE id = ...; 
UPDATE wallet SET balance = ... WHERE id = ...; 
UPDATE wallet SET balance = ... WHERE id = ...; 
... 
COMMIT; 

B計劃ORM代碼:編寫生成

CREATE TEMPORARY TABLE ToDo (
    id ..., 
    new_balance ... 
); 
INSERT INTO ToDo -- either one row at a time, or a bulk insert 
UPDATE wallet 
    JOIN ToDo USING(id) 
    SET wallet.balance = ToDo.new_balance; -- bulk update 

ORM代碼(檢查語法,測試等等)

+0

謝謝:)我知道這可能是糟糕的模式設計,但事實是我需要以某種方式更新該程序的每次迭代1000行。計劃A是我最初的計劃,但實在太慢了。基於上面univerio的代碼和'before_flush'監聽器的組合,B計劃現在是我最終得到的。好的是,禁用監聽器,我有默認行爲(計劃A),並啓用它我得到優化的更新(計劃B) –

+0

關於計劃B(你的分類賬)的警告 - 如果有人可以寫的時間之間的時間更新完成和你的'TRUNATE',你可能會失去一個條目。如果你可以把你的代碼翻譯成SQL,我可以詳細說明。 –