我目前正在使用SQLAlchemy(在GAE上連接到Google的雲MySQL)編寫Web應用程序(Flask)並需要對錶進行批量更新。總之,進行了一些計算,導致需要在1000個對象上更新單個值。目前我正在做一切交易,但最後還是要花費很多時間。SQLAlchemy批量更新策略
該表的索引號爲id
,這些都是在單個事務中執行的。所以我相信我避免了通常的錯誤,但仍然非常緩慢。
INFO 2017-01-26 00:45:46,412 log.py:109] UPDATE wallet SET balance=%(balance)s WHERE wallet.id = %(wallet_id)s
2017-01-26 00:45:46,418 INFO sqlalchemy.engine.base.Engine ({'wallet_id': u'3c291a05-e2ed-11e6-9b55-19626d8c7624', 'balance': 1.8711760000000002}, {'wallet_id': u'3c352035-e2ed-11e6-a64c-19626d8c7624', 'balance': 1.5875759999999999}, {'wallet_id': u'3c52c047-e2ed-11e6-a903-19626d8c7624', 'balance': 1.441656}
從我的理解是沒有辦法做到在SQL實際上是一個批量更新,以上聲明最終被多個UPDATE語句被髮送到服務器。
我試過使用Session.bulk_update_mappings()
,但似乎並沒有真正做任何事情:(不知道爲什麼,但更新從未真正發生過,我看不到任何實際使用的方法的實例(包括在性能套件)所以不知道它是否打算使用。
One technique I've seen discussed正在做一個批量插入到另一個表,然後做一個UPDATE JOIN。我已經給它一個測試,就像下面,它似乎是明顯更快。
wallets = db_session.query(Wallet).all()
ledgers = [ Ledger(id=w.id, amount=w._balance) for w in wallets ]
db_session.bulk_save_objects(ledgers)
db_session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount')
db_session.execute('TRUNCATE ledger')
但現在的問題是如何構建我的代碼。我使用ORM和我需要以某種方式不「骯髒」原始Wallet
對象,以便他們不會以舊的方式承諾。我可以創建這些Ledger
對象,並保留它們的列表,然後在批量操作結束時手動插入它們。但是,這幾乎聞起來像我複製ORM機制的一些工作。
有沒有更聰明的方法來做到這一點?到目前爲止,我的大腦正在下降,是這樣的:正如我所說的
class Wallet(Base):
...
_balance = Column(Float)
...
@property
def balance(self):
# first check if we have a ledger of the same id
# and return the amount in that, otherwise...
return self._balance
@balance.setter
def balance(self, amount):
l = Ledger(id=self.id, amount=amount)
# add l to a list somewhere then process later
# At the end of the transaction, do a bulk insert of Ledgers
# and then do an UPDATE JOIN and TRUNCATE
,這一切似乎是對抗我(可能)擁有的工具。有沒有更好的方式來處理這個問題?我可以利用ORM機制來做到這一點嗎?或者是否有更好的方式來進行批量更新?
編輯:或者是否有事情和會議巧妙的事情?也許before_flush?
編輯2:所以我試圖進軍事件機器,現在有這樣的:
@event.listens_for(SignallingSession, 'before_flush')
def before_flush(session, flush_context, instances):
ledgers = []
if session.dirty:
for elem in session.dirty:
if (session.is_modified(elem, include_collections=False)):
if isinstance(elem, Wallet):
session.expunge(elem)
ledgers.append(Ledger(id=elem.id, amount=elem.balance))
if ledgers:
session.bulk_save_objects(ledgers)
session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount')
session.execute('TRUNCATE ledger')
這似乎很哈克和邪惡的我,但似乎工作確定。任何陷阱或更好的方法?
-Matt
是的,我認爲我在這裏對ORM的攻擊太多了。問題是我已經在大多數情況下使用了ORM,所以需要進行相當多的重寫才能以這種方式獲得它,以便我可以完成上面的表單。在我的用例'calculate_new_balance()'中,將取決於以前計算的值(這個用例通過網絡傳播資金),所以我將不得不最終同時查看模型和分類帳以確定具有正確的價值。但是,謝謝,這無疑給我留下了深刻的印象! –