相關:Combine logs and query in ELK歸屬關係的消息通過現場
我們正在建立ELK並希望創造Kibana 4 這裏的問題可視化是我們希望兩個不同類型的消息之間進行關聯。
爲了簡化:
- 消息類型1字段:message_type,common_id_number,BYTE_COUNT, ...
- 消息類型2字段:message_type,common_id_number,主機名,...
這兩個消息在elasticsearch中共享相同的索引。
正如你可以看到,我們試圖圖形,而不採取任何common_id_number考慮,但似乎我們必須使用它。儘管如此,我們還不知道如何。
任何幫助?
編輯
這些都是在ES模板中的相關字段定義:
"URIHost" : {
"type" : "string",
"norms" : {
"enabled" : false
},
"fields" : {
"raw" : {
"type" : "string",
"index" : "not_analyzed",
"ignore_above" : 256
}
}
},
"Type" : {
"type" : "string",
"norms" : {
"enabled" : false
},
"fields" : {
"raw" : {
"type" : "string",
"index" : "not_analyzed",
"ignore_above" : 256
}
}
},
"SessionID" : {
"type" : "long"
},
"Bytes" : {
"type" : "long"
},
"BytesReceived" : {
"type" : "long"
},
"BytesSent" : {
"type" : "long"
},
這是業務類型,編輯的文檔:
{
"_index": "logstash-2015.11.05",
"_type": "paloalto",
"_id": "AVDZqdBjpQiRid-uxPjE",
"_score": null,
"_source": {
"@version": "1",
"@timestamp": "2015-11-05T21:59:55.543Z",
"syslog_severity_code": 5,
"syslog_facility_code": 1,
"syslog_timestamp": "Nov 5 22:59:58",
"Type": "TRAFFIC",
"SessionID": 21713,
"Bytes": 939,
"BytesSent": 480,
"BytesReceived": 459,
},
"fields": {
"@timestamp": [
1446760795543
]
},
"sort": [
1446760795543
]
}
這是一個威脅類型文檔:
{
"_index": "logstash-2015.11.05",
"_type": "paloalto",
"_id": "AVDZqVNIpQiRid-uxPjC",
"_score": null,
"_source": {
"@version": "1",
"@timestamp": "2015-11-05T21:59:23.440Z",
"syslog_severity_code": 5,
"syslog_facility_code": 1,
"syslog_timestamp": "Nov 5 22:59:26",
"Type": "THREAT",
"SessionID": 21713,
"URIHost": "whatever.nevermind.com",
"URIPath": "/connectiontest.html"
},
"fields": {
"@timestamp": [
1446760763440
]
},
"sort": [
1446760763440
]
}
這是logstash「過濾器」的配置:
filter {
if [type] == "paloalto" {
syslog_pri {
remove_field => [ "syslog_facility", "syslog_severity" ]
}
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{HOSTNAME:hostname} %{INT},%{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME},%{INT},%{WORD:Type},%{GREEDYDATA:log}"
}
remove_field => [ "message" ]
}
if [Type] == "THREAT" {
csv {
source => "log"
columns => [ "Threat_OR_ContentType", "ConfigVersion", "GenerateTime", "SourceAddress", "DestinationAddress", "NATSourceIP", "NATDestinationIP", "Rule", "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "SessionID", "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags", "IPProtocol", "Action", "URL", "Threat_OR_ContentName", "reportid", "Category", "Severity", "Direction", "seqno", "actionflags", "SourceCountry", "DestinationCountry", "cpadding", "contenttype", "pcap_id", "filedigest", "cloud", "url_idx", "user_agent", "filetype", "xff", "referer", "sender", "subject", "recipient" ]
remove_field => [ "log" ]
}
mutate {
convert => {
"SessionID" => "integer"
"SourcePort" => "integer"
"DestinationPort" => "integer"
"NATSourcePort" => "integer"
"NATDestinationPort" => "integer"
}
remove_field => [ "ConfigVersion", "GenerateTime", "VirtualSystem", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "RepeatCount", "Flags", "Action", "reportid", "Severity", "seqno", "actionflags", "cpadding", "pcap_id", "filedigest", "recipient" ]
}
grok {
match => {
"URL" => "%{URIHOST:URIHost}%{URIPATH:URIPath}(%{URIPARAM:URIParam})?"
}
remove_field => [ "URL" ]
}
}
else if [Type] == "TRAFFIC" {
csv {
source => "log"
columns => [ "Threat_OR_ContentType", "ConfigVersion", "GenerateTime", "SourceAddress", "DestinationAddress", "NATSourceIP", "NATDestinationIP", "Rule", "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "SessionID", "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags", "IPProtocol", "Action", "Bytes", "BytesSent", "BytesReceived", "Packets", "StartTime", "ElapsedTimeInSecs", "Category", "Padding", "seqno", "actionflags", "SourceCountry", "DestinationCountry", "cpadding", "pkts_sent", "pkts_received", "session_end_reason" ]
remove_field => [ "log" ]
}
mutate {
convert => {
"SessionID" => "integer"
"SourcePort" => "integer"
"DestinationPort" => "integer"
"NATSourcePort" => "integer"
"NATDestinationPort" => "integer"
"Bytes" => "integer"
"BytesSent" => "integer"
"BytesReceived" => "integer"
"ElapsedTimeInSecs" => "integer"
}
remove_field => [ "ConfigVersion", "GenerateTime", "VirtualSystem", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "RepeatCount", "Flags", "Action", "Packets", "StartTime", "seqno", "actionflags", "cpadding", "pcap_id", "filedigest", "recipient" ]
}
}
date {
match => [ "syslog_timastamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
timezone => "CET"
remove_field => [ "syslog_timestamp" ]
}
}
}
我們正在試圖做的是可視化URIHost條款X軸和字節,BytesSent和金額與BytesReceived作爲Y軸。
什麼是'message_type'字段的值,做「type 1」消息總是在「type 2」之前? – Val
你還可以分享你現有的Logstash配置,這樣人們就不會猜測你的設置了嗎? – Val
「正如你所見」的意思是「請盯着我的屏幕截圖,並嘗試對我的意圖進行逆向工程」。你能更好地描述彈性搜索中存在的數據(也許是一張帶有實際樣本的表格),以及如何呈現這些數據(向我們展示應該如何組合表格等),然後更好地描述您的問題與可視化它。 –