0
作爲數據分析的一部分,我收集了需要存儲在Elasticsearch中的記錄。截至目前,我將記錄收集在一箇中間清單中,然後我通過bulk update進行編寫。對elasticsearch進行流式處理和批量更新
雖然這可行,但當記錄數量太大而不適合記憶時,它有其侷限性。因此,我想知道是否可以使用「流」機制,這將允許
- 持續打開一個連接到elasticsearch
- 在塊狀的方式
我不斷地更新理解我可以簡單地打開一個連接到Elasticsearch並且隨着數據可用而經典地更新,但是這大約慢了10倍,所以我想保持批量機制:
import elasticsearch
import elasticsearch.helpers
import elasticsearch.client
import random
import string
import time
index = "testindexyop1"
es = elasticsearch.Elasticsearch(hosts='elk.example.com')
if elasticsearch.client.IndicesClient(es).exists(index=index):
ret = elasticsearch.client.IndicesClient(es).delete(index=index)
data = list()
for i in range(1, 10000):
data.append({'hello': ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(10))})
start = time.time()
# this version takes 25 seconds
# for _ in data:
# res = es.bulk(index=index, doc_type="document", body=_)
# and this one - 2 seconds
elasticsearch.helpers.bulk(client=es, index=index, actions=data, doc_type="document", raise_on_error=True)
print(time.time()-start)
是的,這是一個很好的解決方案 - 但它以某種方式重寫了流式傳輸功能。我正在尋找可能內置的東西。 – WoJ 2015-04-02 11:34:14
你看過嗎? http://elasticsearch-py.readthedocs.org/en/latest/helpers.html#elasticsearch.helpers.streaming_bulk – 2015-04-02 11:39:48