2016-11-15 74 views
0

我正在編寫logstash 2.4.0配置以通過HTTP日誌。 我們希望將Header字段中傳遞的PORT包含在下面的Line字段中。 沒有定義特定的結束事件。儘管我也嘗試添加結束事件。Logstash聚合篩選器,將信息添加到下一行

我目前使用的輸入日誌文件是:

HEADER 9200 
LINE 1 2016-10-05 08:39:00 Some log data 
LINE 2 2016-10-05 08:40:00 Some other log data 
FOOTER 
HEADER 9300 
LINE 4 2016-11-05 08:39:00 Some log data in another log 
LINE 5 2016-11-05 08:40:00 Some other log data in another log 
FOOTER 

我想有一個像這樣的輸出: 的SERVER_PORT領域目前的輸出不見了

{"message" => "HEADER 9200", 
"@version" => "1", 
"@timestamp" => "2016-11-15T11:17:18.425Z", 
"path" => "test.log", 
"host" => "hostname", 
"type" => "event", 
"env" => "test", 
"port" => 9200, 
"tags" => [[0] "Header"] } 
{"message" => "LINE 1 2016-10-05 08:39:00 Some log data", 
"@version" => "1", 
"@timestamp" => "2016-11-15T11:17:20.186Z", 
"path" => "test.log", 
"host" => "hostname", 
"type" => "event", 
"env" => "test", 
"logMessage" => "1 2016-10-05 08:39:00 Some log data", 
"Server_port" => 9200, 
"tags" => [[0] "Line"]} 
{"message" => "LINE 2 2016-10-05 08:40:00 Some other log data", 
"@version" => "1",< 
"@timestamp" => "2016-11-15T11:17:20.192Z", 
"path" => "test.log", 
"host" => "hostname", 
"type" => "event", 
"env" => "test", 
"logMessage" => "2 2016-10-05 08:40:00 Some other log data", 
"Server_port" => 9200, 
"tags" => [[0] "Line"]} 
{"message" => "FOOTER", 
"@version" => "1", 
"@timestamp" => "2016-11-15T11:17:20.195Z", 
"path" => "test.log", 
"host" => "hostname", 
"type" => "event", 
"env" => "test", 
"tags" => [[0] "Footer"]} 

試過不同的東西后,我目前使用的配置如下,用硬編碼taskid ='abcd'進行測試:

input{ file{ path => "test.log" 
       start_position => "beginning" 
       sincedb_path => "/dev/null" 
       ignore_older => 0 
       type => "event" 
       add_field => { "env" => "test"} } 
} 
filter{ 
     grok { 
       break_on_match => false 
       tag_on_failure => [] 
       match => {"message" => ["^HEADER%{SPACE}%{INT:port:int}"]} 
       add_tag => ["Header"] 
       } 
     grok { 
       break_on_match => false 
       tag_on_failure => [] 
       match => {"message" => "^LINE%{SPACE}%{GREEDYDATA:logMessage}"} 
       add_tag => ["Line"] 
       } 
     grok { 
       break_on_match => false 
       tag_on_failure => [] 
       match => {"message" => "^FOOTER"} 
       add_tag => ["Footer"] 
       }  
     if "Header" in [tags]{ 
       aggregate{ 
         task_id => "abcd" 
         code => "map['server_port'] ||= 0; map['server_port']=event['port']" 
         push_map_as_event_on_timeout => true 
         push_previous_map_as_event => true 
         map_action => "create" 
       } 
     } 
     elseif "Line" in [tags]{ 
       aggregate{ 
         task_id => "abcd" 
         code => "event.set('server_port',map['server_port'])"             
         map_action => "update" 
       } 
     } 
     else if "Footer" in [tags]{ 
       aggregate{ 
         task_id => "abcd" 
         code => "event.set('server_port',map['server_port'])"             
         map_action => "update" 
         end_of_task => true 
         timeout => 120 
       } 
     } 
} 
output { 
    stdout { codec => rubydebug } 
} 

雖然此配置無誤地運行,但它並未創建server_port字段。 我哪裏錯了?

回答

0

擺弄完一些後,我有一個工作測試用例。 我已經改變了配置如下:

grok { 
       break_on_match => false 
       tag_on_failure => [] 
       match => { 
        "message" => ["^HEADER%{SPACE}%{INT:taskid:int}%{SPACE}%{INT:port:int}"] 
       } 
       add_tag => ["Header"] 
       } 

if "Header" in [tags]{ 
      aggregate{ 
        task_id => "%{taskid}" 
        code => "map['port']=event.get('port')" 
        map_action => "create" 
      } 
    } 
    elseif "Line" in [tags]{ 
      aggregate{ 
        task_id =>"%{taskid}" 
        code => "event.set('port',map['port'])" 
        map_action => "update" 
      } 
    } 
    else if "Footer" in [tags]{ 
      aggregate{ 
        task_id => "%{taskid}" 
        code => "event.set('port',map['port'])" 
        map_action => "update" 
        end_of_task => true 
        timeout => 120 
      } 
    } 

,並增加了任務id字段的日誌:

HEADER 123 9200 
LINE 123 2016-10-05 08:39:00 Some log data