2016-12-13 88 views
2

問題。我一直負責研究如何在Elasticsearch中回填數據。到目前爲止有點空虛。基本要點是:Elasticsearch在計算後回填兩個字段到一個新字段

注意:所有文件都存儲在每日索引下,每天約200k文件。

  • 我需要能夠重新索引60天左右的數據。
  • 我需要爲每個doc payload.time_sec和payload.time_nanosec帶兩個字段,取其中的值並對它們進行一些計算(time_sec * 10 ** 9 + time_nanosec),然後將其作爲單個字段返回到重新索引文件

我期待在散裝傭工的Python API文檔: http://elasticsearch-py.readthedocs.io/en/master/helpers.html

但我想知道如果這甚至有可能。

我的想法是使用: 批量助手拉取滾動ID(批量_update?),迭代每個文檔ID,從兩個字段中爲每個碼頭拉入數據,進行數學計算並完成更新請求新的現場數據。

任何人都這樣做?也許有一個groovy腳本的東西?

謝謝!

回答

1

批量幫助器拉取滾動ID(批量_update?),迭代每個文檔ID,從每個擴展塢的兩個字段中提取數據,進行數學計算,並使用新字段完成更新請求數據。

基本上是這樣:

  • 使用/_search?scroll來獲取文檔
  • 執行您的操作
  • 發送/_bulk更新請求

其他選項是:

兩者都支持腳本,如果我理解正確的話,沃爾德是完美的選擇,因爲你的更新不取決於外部因素,所以這可以直接在服務器內完成。

+0

我一直在使用python來解決這個問題到目前爲止,將在一個新的回覆中發佈代碼片段 – fastfiveoh

+0

@fastfiveoh你最終採用哪種解決方案?我遇到類似的問題,並想知道哪種方式是最好的。 –

+0

@RobinWang我終於寫了腳本,你可以在我的回購中看到它:[link] https://github.com/fastfiveoh/python-es-reindex/blob/master/backfill_data – fastfiveoh

0

這是我在哪裏,在(大約):

我一直努力用Python和大宗助手,到目前爲止我在這裏:

doc = helpers.scan(es, query={ 
"query": { 
"match_all": {} 

}, 
"size":1000 
},index=INDEX, scroll='5m', raise_on_error=False) 


    for x in doc: 
x['_index'] = NEW_INDEX 
try: 
    time_sec = x['_source']['payload']['time_sec'] 
    time_nanosec=x['_source']['payload']['time_nanosec'] 
    duration = (time_sec * 10**9) + time_nanosec 
except KeyError: pass 

count = count + 1 

x['_source']['payload']['duration'] = duration 
new_index_data.append(x) 

helpers.bulk(es,new_index_data) 

在這裏,我只是用大頭python助手插入到一個新的索引。不過,我會嘗試使用批量更新對現有索引進行更改和測試。

這看起來像一個正確的方法?

+0

也是我搬到一個新的索引,然後將刪除舊的索引,並指向一個別名到新的索引是採取一個新的映射模板。 – fastfiveoh

+0

聽起來更適合使用'_reindex' API – mark

+0

雅我覺得reindex會很好。我正在檢查數據完整性,回溯3個月,回填大約6-8個新領域。 – fastfiveoh