我想設置一個從一個amqp/rabbitmq隊列獲取所有消息的單個logstash worker,過濾一些消息發送到statsD,但是也會將所有消息發送到彈性搜索。以下實現只是不會將任何消息發送到ElasticSearch。如何在Logstash中使用篩選器設置1:N工作流程?
input {
rabbitmq {
host => "amqp-host"
queue => "elasticsearch"
key => "elasticsearch"
exchange => "elasticsearch"
type => "all"
durable => true
auto_delete => false
exclusive => false
format => "json_event"
debug => false
}
}
filter {
grep {
add_tag => "grepped"
match => ["@message", "Execution of .*? took .* sec"]
}
grok {
tags => ["grepped"]
add_tag => "grokked"
pattern => "Execution of %{DATA:command_name} took %{DATA:response_time} sec"
}
mutate {
tags => ["grepped", "grokked"]
lowercase => [ "command_name" ]
add_tag => ["mutated"]
}
}
output {
elasticsearch_river {
type => "all"
rabbitmq_host => "amqp-host"
debug => false
durable => true
persistent => true
es_host => "es-host"
exchange => "logstash-elasticsearch"
exchange_type => "direct"
index => "logs-%{+YYYY.MM.dd}"
index_type => "%{@type}"
queue => "logstash-elasticsearch"
}
statsd {
type => "command-filter"
tags => ["grepped", "grokked", "mutated"]
host => "some.domain.local"
port => 1234
sender => ""
namespace => ""
timing => ["prefix.%{command_name}.suffix", "%{response_time}"]
increment => ["prefix.%{command_name}.suffix"]
}
}
是否有一些catchall過濾器?或者一種安排標籤的方式,以便一些消息被過濾但所有消息都被轉發給ES?
能否請您對克隆在這裏所做的是什麼展開?過濾器中的項目順序是否重要? – FuzzyAmi
@FuzzyAmi這已經有一段時間了,我快速瀏覽了文檔。 「克隆」過濾器會創建新的日誌事件。然而,每個'克隆'事件都會再次通過整個管道。所以...我克隆每個消息,並通過add_tag => ['clone']添加一個「克隆」標記,然後當它再次通過'exclude_tags => ['clone']'確保我不會結束無限循環的克隆事件。 'clones => ['statsd','elastic-search']'鍵/值會爲每個標記創建2個克隆事件。 把它看作下游處理的分支機制。 – TCopple
感謝澄清! – FuzzyAmi