2017-01-03 59 views
2

我想指數一堆大大熊貓dataframes(約400萬行和50列)到Elasticsearch。指數一大熊貓據幀到Elasticsearch沒有elasticsearch-PY

在尋找如何做到這一點的例子,大多數人都會用elasticsearch-py's bulk helper method,通過它的一個實例of the Elasticsearch class它處理的連接以及其上創建with pandas' dataframe.to_dict(orient='records') method詞典列表。元數據可以預先作爲新列插入到數據幀中,例如, df['_index'] = 'my_index'

但是,我有理由不使用elasticsearch-py庫,並希望直接與Elasticsearch bulk API交談,例如,通過requests或其他方便的HTTP庫。此外,df.to_dict()是大dataframes,可惜很慢,一個數據幀轉換爲類型的字典列表,然後通過elasticsearch-PY序列化JSON聽起來像是不必要的開銷時,有類似dataframe.to_json()的速度非常快,即使在大dataframes。

什麼會得到一個數據框大熊貓成大宗原料藥所需要的格式的方便,快捷的方法呢?我認爲,在正確的方向邁出的一步如下使用dataframe.to_json()

import pandas as pd 
df = pd.DataFrame.from_records([{'a': 1, 'b': 2}, {'a': 3, 'b': 4}, {'a': 5, 'b': 6}]) 
df 
    a b 
0 1 2 
1 3 4 
2 5 6 
df.to_json(orient='records', lines=True) 
'{"a":1,"b":2}\n{"a":3,"b":4}\n{"a":5,"b":6}' 

現在這是一個新行分隔的JSON字符串,但是,它仍然缺乏的元數據。什麼是表演方式讓它在那裏?

編輯: 爲了完整性,元數據JSON文件將看起來像:

{"index": {"_index": "my_index", "_type": "my_type"}} 

因此,在端部通過本體API預期整個JSON看起來像 這(與另外的最後行之後換行符):

{"index": {"_index": "my_index", "_type": "my_type"}} 
{"a":1,"b":2} 
{"index": {"_index": "my_index", "_type": "my_type"}} 
{"a":3,"b":4} 
{"index": {"_index": "my_index", "_type": "my_type"}} 
{"a":5,"b":6} 
+0

您可以發佈一個預期的元數據爲您的樣品DF? – MaxU

+0

當然,請看我的編輯。 – Dirk

+0

我不明白格式(結構) - 它不是一個有效的JSON。您是否可以嘗試使用其批量API將此小型「JSON」加載到ElasticSearch中進行一些小測試? – MaxU

回答

1

同時我發現了多種可能性如何做到這一點至少合理的速度:

import json 
import pandas as pd 
import requests 

# df is a dataframe or dataframe chunk coming from your reading logic 
df['_id'] = df['column_1'] + '_' + df['column_2'] # or whatever makes your _id 
df_as_json = df.to_json(orient='records', lines=True) 

final_json_string = '' 
for json_document in df_as_json.split('\n'): 
    jdict = json.loads(json_document) 
    metadata = json.dumps({'index': {'_id': jdict['_id']}}) 
    jdict.pop('_id') 
    final_json_string += metadata + '\n' + json.dumps(jdict) + '\n' 

headers = {'Content-type': 'application/json', 'Accept': 'text/plain'} 
r = requests.post('http://elasticsearch.host:9200/my_index/my_type/_bulk', data=final_json_string, headers=headers, timeout=60) 

代替使用熊貓to_json()方法,其中一個也可以使用to_dict()如下。這是在我的測試速度較慢,但​​沒有多少:

dicts = df.to_dict(orient='records') 
final_json_string = '' 
for document in dicts: 
    metadata = {"index": {"_id": document["_id"]}} 
    document.pop('_id') 
    final_json_string += json.dumps(metadata) + '\n' + json.dumps(document) + '\n' 

當大數據集運行此,我們可以通過安裝它,然後import ujson as jsonimport rapidjson as jsonujsonrapidjson更換Python的默認json庫保存了兩三分鐘, 分別。

通過用並行執行替換順序執行的步驟可以實現更大的加速,以便在請求等待Elasticsearch處理所有文檔並返回響應時,讀取和轉換不會停止。這可以通過線程,多處理,Asyncio,任務隊列完成......但這不在這個問題的範圍之內。

如果你碰巧發現的方法以更快的速度做到JSON的轉換,讓我知道。

+0

只要看看這段代碼,就可以序列化成json,然後再次反序列化循環。我想你可以通過使用'df.iterrows'來獲得簡單的加速,然後在行本身上只調用'to_json' – szxk

1

此功能插入大熊貓據幀到彈性搜索(由大塊大塊)

def insertDataframeIntoElastic(dataFrame,index='index', typ = 'test', server = 'http://localhost:9200', 
          chunk_size = 2000): 
    headers = {'content-type': 'application/x-ndjson', 'Accept-Charset': 'UTF-8'} 
    records = dataFrame.to_dict(orient='records') 
    actions = ["""{ "index" : { "_index" : "%s", "_type" : "%s"} }\n""" % (index, typ) +json.dumps(records[j]) 
        for j in range(len(records))] 
    i=0 
    while i<len(actions): 
     serverAPI = server + '/_bulk' 
     data='\n'.join(actions[i:min([i+chunk_size,len(actions)])]) 
     data = data + '\n' 
     r = requests.post(serverAPI, data = data, headers=headers) 
     print r.content 
     i = i+chunk_size