0

我在ElasticSearch中加載了具有分散在多個事件中的數據的日誌行,比如event_id在事件(行)號5中,event_action在事件號88中可用,進一步event_port信息在事件編號455中可用。我如何提取這些數據以便我的輸出看起來像下面一樣。對於這種情況,多行編解碼器不起作用。使用單個logstash過濾器從Elasticsearch提取多個事件的數據

{ 
event_id: 1223 
event_action: "socket_open" 
event_port: 76654 
} 

目前我有日誌文件持久化,所以我可以從ES獲取文件路徑。我試圖從ruby過濾器執行shell腳本,這個shell腳本可以執行grep命令並將stdout數據放入新事件中,如下所示。

input { 
    elasticsearch { 
    hosts => "localhost:9200" 
    index => "my-logs" 
    } 
} 

filter 
{ 

    ruby { 
    code => 'require "open3" 
      file_path = event.get("file_path") 
      cmd = "my_filter.sh -f #{file_path}" 
      stdin, stdout, stderr = Open3.popen3(cmd) 
      event.set("process_result", stdout.read) 
      err = stderr.read 
      if err.to_s.empty? 
       filter_matched(event) 
      else 
       event.set("ext_script_err_msg", err) 
      end' 
     remove_field => ["file_path"] 
    } 
    } 

通過上述方法,我面臨着問題。

1)在大文件中執行grep可能非常耗時。有沒有其他的選擇,而不需要grep文件? 2)我的輸入插件(附上)從Elastic Search中獲取事件,該事件的file_path設置爲索引中的「ALL」事件,這使my_filter.sh被多次執行,這是我想避免的。我如何從ES中提取獨特的file_path?

回答

0

Elasticsearch沒有根據輸入建立輸出流。 Elastic是一個noSQL數據庫,數據應該隨時間消耗(以實時方式)。這意味着您應該先將所有內容存儲在Elasticsearch中,然後處理數據。在你的情況下,你通過等待不同的事件來影響流量。

如果您確實需要捕獲這些事件並在後臺處理它,您可以在logstash(輸入爲nxlog)或python腳本(用作logstash中的過濾器)過濾之前嘗試使用nxlog之類的東西。在你的情況下,我會預處理我的數據以合併它,然後將它發送到logstash