2017-02-15 112 views
0

環境Logstash過濾和解析模輸出

  • 的Ubuntu 16.04
  • Logstash 5.2.1
  • ElasticSearch 5.1

我已經配置好了DEIS平臺將日誌發送到我們的沒有問題的Logstack節點。不過,我對Ruby還是個新手,Regexes並不是我的強項。

日誌示例

2017-02-15T14:55:24UTC deis-logspout[1]: 2017/02/15 14:55:24 routing all to udp://x.x.x.x:xxxx\n 

Logstash配置:

input { 
    tcp { 
     port => 5000 
     type => syslog 
     codec => plain 
    } 
    udp { 
     port => 5000 
     type => syslog 
     codec => plain 
    } 
} 

filter { 
    json { 
     source => "syslog_message" 
    } 
} 

output { 
    elasticsearch { hosts => ["foo.somehost"] } 
} 

Elasticsearch輸出:

"@timestamp" => 2017-02-15T14:55:24.408Z, 
"@version" => "1", 
"host" => "x.x.x.x", 
"message" => "2017-02-15T14:55:24UTC deis-logspout[1]: 2017/02/15 14:55:24 routing all to udp://x.x.x.x:xxxx\n", 
"type" => "json" 

期望的結果:

"@timestamp" => 2017-02-15T14:55:24.408Z, 
"@version" => "1", 
"host" => "x.x.x.x", 
"type" => "json" 
"container" => "deis-logspout" 
"severity level" => "Info" 
"message" => "routing all to udp://x.x.x.x:xxxx\n" 

我怎樣才能提取信息從消息中爲它們單獨的領域?

+0

你的意思是你的提取*消息*爲四個不同的領域* *(部件如你所提到的)?您是否已經從上面的ES **輸出**中看到了這一點。那麼你在這裏有什麼問題?關於方法? – Kulasangar

+0

對不起,如果這不簡明,我更新了這個問題。 – user7565843

回答

0

不幸的是,你對你想要做什麼的假設略有偏差,但我們可以解決這個問題!

您爲JSON創建了一個正則表達式,但是您沒有解析JSON。你只是簡單地解析一個混雜的syslog日誌(參見source中的syslogStreamer),但實際上並不是syslog格式(RFC 5424或3164)。 Logstash隨後提供JSON輸出。

讓我們分解消息,它將成爲您解析的源代碼。關鍵是你必須正面解析消息。

消息:

2017-02-15T14:55:24UTC deis-logspout[1]: 2017/02/15 14:55:24 routing all to udp://x.x.x.x:xxxx\n 
  • 2017-02-15T14:55:24UTC:時間戳是一種常見的神交圖案。這主要遵循TIMESTAMP_ISO8601但不完全。
  • deis-logspout[1]:這將是您的日誌源,您可以命名容器。您可以使用grok模式URIHOST
  • routing all to udp://x.x.x.x:xxxx\n:由於對於大多數日誌的消息被包含在該消息的末尾,可以只然後使用神交圖案GREEDYDATA這是.*在正則表達式的等效。
  • 2017/02/15 14:55:24:另一個時間戳(爲什麼?)與常見的grok模式不匹配。

使用grok過濾器,可以將語法(從正則表達式抽象)映射到語義(您提取的值的名稱)。例如%{URIHOST:container}

你會看到我做了一些hack grock過濾器來完成格式化工作。即使您不打算捕獲結果,也可以匹配文本的部分內容。如果您無法更改時間戳的格式以符合標準,請創建一個自定義模式。

配置:

input { 
    tcp { 
     port => 5000 
     type => deis 
    } 
    udp { 
     port => 5000 
     type => deis 
    } 
} 

filter { 
    grok { 
     match => { "message" => "%{TIMESTAMP_ISO8601:timestamp}(UTC|CST|EST|PST) %{URIHOST:container}\[%{NUMBER}\]: %{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME} %{GREEDYDATA:msg}" } 
    } 
} 

output { 
    elasticsearch { hosts => ["foo.somehost"] } 
} 

輸出:

{ 
    "container" => "deis-logspout", 
    "msg" => "routing all to udp://x.x.x.x:xxxx", 
    "@timestamp" => 2017-02-22T23:55:28.319Z, 
    "port" => 62886, 
    "@version" => "1", 
    "host" => "10.0.2.2", 
    "message" => "2017-02-15T14:55:24UTC deis-logspout[1]: 2017/02/15 14:55:24 routing all to udp://x.x.x.x:xxxx", 
    "timestamp" => "2017-02-15T14:55:24" 
    "type" => "deis" 
} 

你還可以變異的項目下降@timestamp,@host等,因爲這些是由Logstash提供默認。另一個建議是使用date filter將發現的任何時間戳轉換爲可用格式(更適合搜索)。

根據日誌格式的不同,您可能必須稍微改變模式。我只有一個例子可以解決。這也保留了原始的完整信息,因爲在Logstash中完成的任何字段操作都具有破壞性(它們用相同名稱的字段覆蓋值)。

資源:

+0

感謝您的意見。我將如何配置一個Grok過濾器來解析消息? – user7565843

+0

再次感謝Signus,我已經更新了上述問題以獲得更好的上下文。 – user7565843