2015-11-05 9 views
2

相關:Combine logs and query in ELK歸屬關係的消息通過現場

我們正在建立ELK並希望創造Kibana 4 這裏的問題可視化是我們希望兩個不同類型的消息之間進行關聯。

爲了簡化:

  • 消息類型1字段:message_type,common_id_number,BYTE_COUNT, ...
  • 消息類型2字段:message_type,common_id_number,主機名,...

這兩個消息在elasticsearch中共享相同的索引。

enter image description here

正如你可以看到,我們試圖圖形,而不採取任何common_id_number考慮,但似乎我們必須使用它。儘管如此,我們還不知道如何。

任何幫助?

編輯

這些都是在ES模板中的相關字段定義:

 "URIHost" : { 
     "type" : "string", 
     "norms" : { 
      "enabled" : false 
     }, 
     "fields" : { 
      "raw" : { 
      "type" : "string", 
      "index" : "not_analyzed", 
      "ignore_above" : 256 
      } 
     } 
     }, 
     "Type" : { 
     "type" : "string", 
     "norms" : { 
      "enabled" : false 
     }, 
     "fields" : { 
      "raw" : { 
      "type" : "string", 
      "index" : "not_analyzed", 
      "ignore_above" : 256 
      } 
     } 
     }, 
     "SessionID" : { 
     "type" : "long" 
     }, 
     "Bytes" : { 
     "type" : "long" 
     }, 
     "BytesReceived" : { 
     "type" : "long" 
     }, 
     "BytesSent" : { 
     "type" : "long" 
     }, 

這是業務類型,編輯的文檔:

{ 
    "_index": "logstash-2015.11.05", 
    "_type": "paloalto", 
    "_id": "AVDZqdBjpQiRid-uxPjE", 
    "_score": null, 
    "_source": { 
    "@version": "1", 
    "@timestamp": "2015-11-05T21:59:55.543Z", 
    "syslog_severity_code": 5, 
    "syslog_facility_code": 1, 
    "syslog_timestamp": "Nov 5 22:59:58", 
    "Type": "TRAFFIC", 
    "SessionID": 21713, 
    "Bytes": 939, 
    "BytesSent": 480, 
    "BytesReceived": 459, 
    }, 
    "fields": { 
    "@timestamp": [ 
     1446760795543 
    ] 
    }, 
    "sort": [ 
    1446760795543 
    ] 
} 

這是一個威脅類型文檔:

{ 
    "_index": "logstash-2015.11.05", 
    "_type": "paloalto", 
    "_id": "AVDZqVNIpQiRid-uxPjC", 
    "_score": null, 
    "_source": { 
    "@version": "1", 
    "@timestamp": "2015-11-05T21:59:23.440Z", 
    "syslog_severity_code": 5, 
    "syslog_facility_code": 1, 
    "syslog_timestamp": "Nov 5 22:59:26", 
    "Type": "THREAT", 
    "SessionID": 21713, 
    "URIHost": "whatever.nevermind.com", 
    "URIPath": "/connectiontest.html" 
    }, 
    "fields": { 
    "@timestamp": [ 
     1446760763440 
    ] 
    }, 
    "sort": [ 
    1446760763440 
    ] 
} 

這是logstash「過濾器」的配置:

filter { 
    if [type] == "paloalto" { 
     syslog_pri { 
      remove_field => [ "syslog_facility", "syslog_severity" ] 
     } 

     grok { 
      match => { 
       "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{HOSTNAME:hostname} %{INT},%{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME},%{INT},%{WORD:Type},%{GREEDYDATA:log}" 
      } 
      remove_field => [ "message" ] 
     } 

     if [Type] == "THREAT" { 
      csv { 
       source => "log" 
       columns => [ "Threat_OR_ContentType", "ConfigVersion", "GenerateTime", "SourceAddress", "DestinationAddress", "NATSourceIP", "NATDestinationIP", "Rule", "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "SessionID", "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags", "IPProtocol", "Action", "URL", "Threat_OR_ContentName", "reportid", "Category", "Severity", "Direction", "seqno", "actionflags", "SourceCountry", "DestinationCountry", "cpadding", "contenttype", "pcap_id", "filedigest", "cloud", "url_idx", "user_agent", "filetype", "xff", "referer", "sender", "subject", "recipient" ] 
       remove_field => [ "log" ] 
      } 
      mutate { 
       convert => { 
        "SessionID" => "integer" 
        "SourcePort" => "integer" 
        "DestinationPort" => "integer" 
        "NATSourcePort" => "integer" 
        "NATDestinationPort" => "integer" 
       } 
       remove_field => [ "ConfigVersion", "GenerateTime", "VirtualSystem", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "RepeatCount", "Flags", "Action", "reportid", "Severity", "seqno", "actionflags", "cpadding", "pcap_id", "filedigest", "recipient" ] 
      } 
      grok { 
       match => { 
        "URL" => "%{URIHOST:URIHost}%{URIPATH:URIPath}(%{URIPARAM:URIParam})?" 
       } 
       remove_field => [ "URL" ] 
      } 
     } 

     else if [Type] == "TRAFFIC" { 
      csv { 
       source => "log" 
       columns => [ "Threat_OR_ContentType", "ConfigVersion", "GenerateTime", "SourceAddress", "DestinationAddress", "NATSourceIP", "NATDestinationIP", "Rule", "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "SessionID", "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags", "IPProtocol", "Action", "Bytes", "BytesSent", "BytesReceived", "Packets", "StartTime", "ElapsedTimeInSecs", "Category", "Padding", "seqno", "actionflags", "SourceCountry", "DestinationCountry", "cpadding", "pkts_sent", "pkts_received", "session_end_reason" ] 
       remove_field => [ "log" ] 
      } 
      mutate { 
       convert => { 
        "SessionID" => "integer" 
        "SourcePort" => "integer" 
        "DestinationPort" => "integer" 
        "NATSourcePort" => "integer" 
        "NATDestinationPort" => "integer" 
        "Bytes" => "integer" 
        "BytesSent" => "integer" 
        "BytesReceived" => "integer" 
        "ElapsedTimeInSecs" => "integer" 
       } 
       remove_field => [ "ConfigVersion", "GenerateTime", "VirtualSystem", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "RepeatCount", "Flags", "Action", "Packets", "StartTime", "seqno", "actionflags", "cpadding", "pcap_id", "filedigest", "recipient" ] 
      } 
     } 

     date { 
      match => [ "syslog_timastamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] 
      timezone => "CET" 
      remove_field => [ "syslog_timestamp" ] 
     } 
    } 
} 

我們正在試圖做的是可視化URIHost條款X軸和字節,BytesSent和金額與BytesReceived作爲Y軸。

+0

什麼是'message_type'字段的值,做「type 1」消息總是在「type 2」之前? – Val

+0

你還可以分享你現有的Logstash配置,這樣人們就不會猜測你的設置了嗎? – Val

+0

「正如你所見」的意思是「請盯着我的屏幕截圖,並嘗試對我的意圖進行逆向工程」。你能更好地描述彈性搜索中存在的數據(也許是一張帶有實際樣本的表格),以及如何呈現這些數據(向我們展示應該如何組合表格等),然後更好地描述您的問題與可視化它。 –

回答

3

我認爲你可以使用aggregate filter來執行你的任務。 aggregate過濾器支持根據公共字段值將多個日誌行聚合爲一個單一事件。在你的情況下,我們要使用的通用字段將是SessionID字段。

然後,我們需要另一個字段來檢測應該聚合的第一個事件與第二個/最後一個事件。在你的情況下,這將是Type字段。

你需要改變這樣的當前配置:

filter { 

    ... all other filters 

    if [Type] == "THREAT" { 
     ... all other filters 

     aggregate { 
      task_id => "%{SessionID}" 
      code => "map['URIHost'] = event['URIHost']; map['URIPath'] = event['URIPath']" 
     } 
    } 

    else if [Type] == "TRAFFIC" { 
     ... all other filters 

     aggregate { 
      task_id => "%{SessionID}" 
      code => "event['URIHost'] = map['URIHost']; event['URIPath'] = map['URIPath']" 
      end_of_task => true 
      timeout => 120 
     } 
    } 
} 

總的想法是,當Logstash遇到THREAT日誌將暫時存儲在內存中的事件映射URIHostURIPath,然後當一個TRAFFIC日誌進入,URIHostURIPath字段將被添加到事件中。如果需要,您也可以複製其他字段。您還可以根據您預計在最後一次THREAT事件之後發生的事件的持續時間,調整超時時間(以秒爲單位)。

最後,你會得到的文檔與數據來自THREATTRAFFIC日誌行合併,您可以輕鬆創建可視化顯示在您的屏幕截圖所示,每URIHost字節數。

+0

我從分析中忽略了URIPath,以使URIHost更好。謝謝! – AMS

+0

太棒了,很高興它解決了。 – Val