2016-03-21 96 views
1

如何可以使用從CSV追加陣列elasticsearch

CSV的

爲例

一個csv containt線

id,key1,key2 
1,toto1,toto2 
1,titi1,titi2 
2,tata1,tata2 

結果應該是logstash我追加一個陣列上elasticsearch使用JSON對象2個文件

{ 
    "id": 1, 
    [{ 
     "key1": "toto1", 
     "key2": "toto2" 
    }, { 
     "key1": "titi1 ", 
     "key2": "titi2" 
    }] 
} 
,{ 
    "id": 2, 
    [{ 
     "key1": "tata1", 
     "key2": "tata2" 
    }] 
} 

親切地

回答

1

首先,創建您的ES映射(如果有必要),將內部對象聲明爲嵌套對象。

{ 
"mappings": { 
    "key_container": { 
     "properties": { 
     "id": { 
      "type": "keyword", 
      "index": true 
     }, 
     "keys": { 
      "type": "nested", 
      "properties": { 
      "key1": { 
       "type": "keyword", 
       "index": true 
      }, 
      "key2": { 
       "type": "text", 
       "index": true 
      } 
      } 
     } 
     } 
    } 
    } 
} 

keys屬性將包含嵌套對象的數組。

比你可以在兩個跳與logstash加載CSV:

  1. 指數(創建)只包含id屬性的基礎對象
  2. 更新用含有嵌套陣列鍵屬性的基礎對象對象

第一logstash配置(僅相關部分):

filter { 
    csv { 
     columns => ["id","key1","key1"] 
     separator => "," 
     # Remove the keys because the will be loaded in the next hop with update 
     remove_field => [ "key1", "key2"] 
    } 
    # Remove the row containing the column names 
    if [id] == "id" { 
     drop { } 
    } 
} 
output { 
    elasticsearch { 
     action => "index" 
     document_id => "%{id}" 
     hosts => [ "localhost:9200" ] 
     index => "key_container" 
    } 
} 

第二步驟logstash配置(您必須啓用elasticsearch腳本):

filter { 
    csv { 
     columns => ["id","key1","key2"] 
     separator => "," 
    } 
    # Convert the attributes into an object called 'key' that is passed to the script below (via the 'event' object) 
    mutate{ 
     rename => { 
      "key1" => "[key][key1]" 
      "key2" => "[key][key2]" 
     } 
    } 
} 
output { 
    elasticsearch { 
     action => "update" 
     document_id => "%{id}" 
     doc_as_upsert => "true" 
     hosts => [ "localhost:9200" ] 
     index => "key_container" 
     script_lang => "groovy" 
     # key_container.keys is an array of key objects 
     # arrays can be built only with scripts and defined as an array when we put the first element into it 
     script => "if (ctx._source.containsKey('keys')) {ctx._source.keys += event.key} else {ctx._source.keys = [event.key]}" 
    } 
} 

總結,你需要,因爲數組創建需要的腳本是僅適用於更新的這個兩個跳加載。