如何在pymongo中執行批量上插?我想更新一堆條目,並且一次只做一條,速度很慢。pymongo中的快速或批量上插
這個問題的答案几乎相同的問題就在這裏:Bulk update/upsert in MongoDB?
接受的答案實際上並沒有回答這個問題。它只是提供了一個鏈接到mongo CLI進行導入/導出。
我也將開放給別人解釋爲什麼做一個散裝UPSERT是沒有可能的/沒有最好的做法,但我們解釋一下這種問題的首選解決方案是什麼。
謝謝!
如何在pymongo中執行批量上插?我想更新一堆條目,並且一次只做一條,速度很慢。pymongo中的快速或批量上插
這個問題的答案几乎相同的問題就在這裏:Bulk update/upsert in MongoDB?
接受的答案實際上並沒有回答這個問題。它只是提供了一個鏈接到mongo CLI進行導入/導出。
我也將開放給別人解釋爲什麼做一個散裝UPSERT是沒有可能的/沒有最好的做法,但我們解釋一下這種問題的首選解決方案是什麼。
謝謝!
答案是一樣的:散裝upserts不支持。
,您可以更新使用多=真符合您查詢指定的所有文件。
有一個關於做一個批處理命令你所希望的方式錯誤here。
的MongoDB 2.6+有批量操作的支持。這包括批量插入,upserts,更新等。這樣做的目的是減少/消除按記錄操作(按文檔逐個記錄)的往返等待時間的延遲是正確的。
那麼,如何工作的呢?在Python的例子,因爲這是我參與的工作。
>>> import pymongo
>>> pymongo.version
'2.7rc0'
要使用此功能,我們創建了一個「散」對象,文檔添加到它,然後調用執行它,它會發送所有更新立刻。注意事項:所收集操作的BSON大小(bsonsizes的總和)不得超過16 MB的文檔大小限制。當然,操作次數因此可能會有很大差異,您的里程可能會有所不同。
實施例的散裝UPSERT操作Pymongo:
import pymongo
conn = pymongo.MongoClient('myserver', 8839)
db = conn['mydbname']
coll = db.myCollection
bulkop = coll.initialize_ordered_bulk_op()
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':1})
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':2})
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':3})
retval = bulkop.execute()
這是必不可少的方法。更多信息請訪問:
是否有任何方法可以避免在每個操作中查找?考慮我想插入一個嵌入到文檔中的json對象列表,這樣,我必須每次都找到這個文檔並插入,這看起來效率不高。 – SpiXel 2016-07-11 10:48:48
這只是MongoDB寫作的方式,而不是我的想法。在任何upsert或更新中,您必須找到要更新的文檔,並且該查找指定要採取哪些操作。此外,它應該對大量行爲有效,但不一定針對您的特定應用程序,因此,正如他們所說,您的里程可能會有所不同。 – 2016-07-21 16:00:08
'initialize_ordered_bulk_op()'語法現在已被棄用:http://api.mongodb.com/python/current/changelog.html?highlight=initialize_ordered_bulk_op – duhaime 2018-01-21 15:58:53
僅在pymongo 2.7+
pymongo的現代版本(比3.X以上)在降級其中一個一致的界面包裹批量操作服務器版本不支持批量操作。這在MongoDB官方支持的驅動程序中現在是一致的。
因此,編碼的首選方法是使用bulk_write()
,而不是使用UpdateOne
其他適當的操作操作。現在當然,最好使用自然語言列表,而不是一個具體的建設者
舊機制的文檔的直接翻譯:
from pymongo import UpdateOne
operations = [
UpdateOne({ "field1": 1},{ "$push": { "vals": 1 } },upsert=True),
UpdateOne({ "field1": 1},{ "$push": { "vals": 2 } },upsert=True),
UpdateOne({ "field1": 1},{ "$push": { "vals": 3 } },upsert=True)
]
result = collection.bulk_write(operations)
或者經典的文檔轉換循環:
import random
from pymongo import UpdateOne
random.seed()
operations = []
for doc in collection.find():
# Set a random number on every document update
operations.append(
UpdateOne({ "_id": doc["_id"] },{ "$set": { "random": random.randint(0,10) } })
)
# Send once every 1000 in batch
if (len(operations) == 1000):
collection.bulk_write(operations,ordered=False)
operations = []
if (len(operations) > 0):
collection.bulk_write(operations,ordered=False)
返回的結果爲BulkWriteResult
,其中將包含匹配和更新文檔的計數器以及發生的任何「upserts」的返回_id
值。
對於批量操作數組的大小存在一些誤解。發送到服務器的實際請求不能超過16MB BSON限制,因爲該限制也適用於發送到使用BSON格式的服務器的「請求」。
但是,這並不能控制您可以構建的請求數組的大小,因爲實際操作只能以批處理方式發送和處理1000。唯一真正的限制是那1000條操作指令本身並不實際創建大於16MB的BSON文檔。這確實是一個很高的命令。
批量方法的一般概念是「較少流量」,因爲一次發送很多東西,只處理一個服務器響應。減少每個更新請求的開銷可以節省大量時間。
不好意思,但我不明白,爲什麼你把你的批次分成你的循環,如果你知道你的司機自己做。我的意思是在你的例子中,有1000個'UpdateOne({「_id」:doc [「_ id」]},{「$ set」:{「random」:random.randint(0,10)}})'比16 Mb,這是顯而易見的,因此pymongo可以拆分您的數據,並且請求將會成功。你爲什麼用手分割數據? – 2017-04-27 14:54:53
@Budulianin,因爲它迭代了一個遊標,它是非常多的反模式將整個集合加載到內存中。這基本上是爲什麼他們在合理的大小批量。此外,這需要重新考慮,特別是考慮到異步環境(基本技術保持不變),您基本上可以通過網絡發送數據並在等待確認時構建另一批。如果你認爲你通過「一個大批量」而不是「幾個合理的」獲得了某些東西,那麼在大多數情況下,我會說這不太可能 – 2017-06-30 12:24:08
與Python 3.5 +最快的批量更新,電機和ASYNCIO:
import asyncio
import datetime
import logging
import random
import time
import motor.motor_asyncio
import pymongo.errors
async def execute_bulk(bulk):
try:
await bulk.execute()
except pymongo.errors.BulkWriteError as err:
logging.error(err.details)
async def main():
cnt = 0
bulk = db.initialize_unordered_bulk_op()
tasks = []
async for document in db.find({}, {}, no_cursor_timeout=True):
cnt += 1
bulk.find({'_id': document['_id']}).update({'$set': {"random": random.randint(0,10)}})
if not cnt % 1000:
task = asyncio.ensure_future(execute_bulk(bulk))
tasks.append(task)
bulk = db.initialize_unordered_bulk_op()
if cnt % 1000:
task = asyncio.ensure_future(bulk.execute(bulk))
tasks.append(task)
logging.info('%s processed', cnt)
await asyncio.gather(*tasks)
logging.basicConfig(level='INFO')
db = motor.motor_asyncio.AsyncIOMotorClient()['database']['collection']
start_time = time.time()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
execution_time = time.time() - start_time
logging.info('Execution time: %s', datetime.timedelta(seconds=execution_time))
爲什麼模1000?我認爲mongo批量操作「自動」執行2000年的操作... – duhaime 2017-03-16 14:23:43
@duhaime很好的問題。我可以參考[文檔](https://docs.mongodb.com/manual/reference/limits/#Bulk-Operation-Size),但看起來像[BSON的16MB限制](http://stackoverflow.com/ a/24238349/4249707)將會更精確。不幸的是,我不知道如何衡量請求的大小 – 2017-03-16 19:31:56
@duhaime順便說一句,根據最新的電機[文檔](http://motor.readthedocs.io/en/stable/examples/bulk.html#bulk-insert)我們可以根本不用擔心 – 2017-03-16 19:38:05
如果你有很多的數據,你想,如果存在數據用「_id」的判斷,
你可以試試...
import pymongo
from pymongo import UpdateOne
client = pymongo.MongoClient('localhost', 27017)
db=client['sampleDB']
collectionInfo = db.sample
#sample data
datas=[
{"_id":123456,"name":"aaa","N":1,"comment":"first sample","lat":22,"lng":33},
{"_id":234567,"name":"aaa","N":1,"comment":"second sample","lat":22,"lng":33},
{"_id":345678,"name":"aaa","N":1,"comment":"xxx sample","lat":22,"lng":33},
{"_id":456789,"name":"aaa","N":1,"comment":"yyy sample","lat":22,"lng":33},
{"_id":123456,"name":"aaaaaaaaaaaaaaaaaa","N":1,"comment":"zzz sample","lat":22,"lng":33},
{"_id":11111111,"name":"aaa","N":1,"comment":"zzz sample","lat":22,"lng":33}
]
#you should split judge item and other data
ids=[data.pop("_id") for data in datas]
operations=[UpdateOne({"_id":idn},{'$set':data},upsert=True) for idn ,data in zip(ids,datas)]
collectionInfo.bulk_write(operations)
我的英語很差,如果你不明白我說什麼,我很抱歉
這是很可悲的。 – ComputationalSocialScience 2011-03-14 16:44:04
這不再是這種情況。看凱文的答案。 – 2016-04-23 05:33:37
Bulk.find.upsert()https://docs.mongodb.com/manual/reference/method/Bulk.find.upsert/index.html – Tanuj 2017-08-25 18:49:40