2015-04-02 91 views
0

作爲數據分析的一部分,我收集了需要存儲在Elasticsearch中的記錄。截至目前,我將記錄收集在一箇中間清單中,然後我通過bulk update進行編寫。對elasticsearch進行流式處理和批量更新

雖然這可行,但當記錄數量太大而不適合記憶時,它有其侷限性。因此,我想知道是否可以使用「流」機制,這將允許

  • 持續打開一個連接到elasticsearch
  • 在塊狀的方式

我不斷地更新理解我可以簡單地打開一個連接到Elasticsearch並且隨着數據可用而經典地更新,但是這大約慢了10倍,所以我想保持批量機制:

import elasticsearch 
import elasticsearch.helpers 
import elasticsearch.client 
import random 
import string 
import time 

index = "testindexyop1" 
es = elasticsearch.Elasticsearch(hosts='elk.example.com') 
if elasticsearch.client.IndicesClient(es).exists(index=index): 
    ret = elasticsearch.client.IndicesClient(es).delete(index=index) 

data = list() 
for i in range(1, 10000): 
    data.append({'hello': ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(10))}) 

start = time.time() 
# this version takes 25 seconds 
# for _ in data: 
#  res = es.bulk(index=index, doc_type="document", body=_) 

# and this one - 2 seconds 
elasticsearch.helpers.bulk(client=es, index=index, actions=data, doc_type="document", raise_on_error=True) 

print(time.time()-start) 

回答

0

您總是可以簡單地將數據拆分爲n個大致相同大小的集合,這樣每個集合都適合內存,然後執行n個批量更新。這似乎是對我來說最簡單的解決方案。

+0

是的,這是一個很好的解決方案 - 但它以某種方式重寫了流式傳輸功能。我正在尋找可能內置的東西。 – WoJ 2015-04-02 11:34:14

+0

你看過嗎? http://elasticsearch-py.readthedocs.org/en/latest/helpers.html#elasticsearch.helpers.streaming_bulk – 2015-04-02 11:39:48