2013-06-27 58 views
0

我想設置一個從一個amqp/rabbitmq隊列獲取所有消息的單個logstash worker,過濾一些消息發送到statsD,但是也會將所有消息發送到彈性搜索。以下實現只是不會將任何消息發送到ElasticSearch。如何在Logstash中使用篩選器設置1:N工作流程?

input { 
    rabbitmq { 
    host => "amqp-host" 
    queue => "elasticsearch" 
    key => "elasticsearch" 
    exchange => "elasticsearch" 
    type => "all" 
    durable => true 
    auto_delete => false 
    exclusive => false 
    format => "json_event" 
    debug => false 
    } 
} 

filter { 
    grep { 
     add_tag => "grepped" 
     match => ["@message", "Execution of .*? took .* sec"] 
    } 

    grok { 
     tags => ["grepped"] 
     add_tag => "grokked" 
     pattern => "Execution of %{DATA:command_name} took %{DATA:response_time} sec" 
    } 

    mutate { 
     tags => ["grepped", "grokked"] 
     lowercase => [ "command_name" ] 
     add_tag => ["mutated"] 
    } 
} 

output { 
    elasticsearch_river { 
    type => "all" 
    rabbitmq_host => "amqp-host" 
    debug => false 
    durable => true 
    persistent => true 
    es_host => "es-host" 
    exchange => "logstash-elasticsearch" 
    exchange_type => "direct" 
    index => "logs-%{+YYYY.MM.dd}" 
    index_type => "%{@type}" 
    queue => "logstash-elasticsearch" 
    } 

statsd { 
    type => "command-filter" 
    tags => ["grepped", "grokked", "mutated"] 
    host => "some.domain.local" 
    port => 1234 
    sender => "" 
    namespace => "" 
    timing => ["prefix.%{command_name}.suffix", "%{response_time}"] 
    increment => ["prefix.%{command_name}.suffix"] 
    } 
} 

是否有一些catchall過濾器?或者一種安排標籤的方式,以便一些消息被過濾但所有消息都被轉發給ES?

回答

1

clone過濾器派上用場。以下是我的結果配置文件。

input { 
    rabbitmq { 
    host => "amqp-host" 
    queue => "elasticsearch" 
    key => "elasticsearch" 
    exchange => "elasticsearch" 
    type => "all" 
    durable => true 
    auto_delete => false 
    exclusive => false 
    format => "json_event" 
    debug => false 
    } 
} 

filter { 
    clone { 
     exclude_tags => ["cloned"] 
     clones => ["statsd", "elastic-search"] 
     add_tag => ["cloned"] 
    } 

    grep { 
     type => "statsd" 
     add_tag => "grepped" 
     match => ["@message", "Execution of .*Command took .* sec"] 
    } 

    grok { 
     type => "statsd" 
     tags => ["grepped"] 
     add_tag => "grokked" 
     pattern => "Execution of %{DATA:command_name}Command took %{DATA:response_time} sec" 
    } 

    mutate { 
     type => "statsd" 
     tags => ["grepped", "grokked"] 
     lowercase => [ "command_name" ] 
     add_tag => ["mutated"] 
    } 
} 

output { 
    elasticsearch_river { 
    type => "all" 
    rabbitmq_host => "amqp-host" 
    debug => false 
    durable => true 
    persistent => true 
    es_host => "es-host" 
    exchange => "logstash-elasticsearch" 
    exchange_type => "direct" 
    index => "logs-%{+YYYY.MM.dd}" 
    index_type => "%{@type}" 
    queue => "logstash-elasticsearch" 
    } 

    statsd { 
    type => "statsd" 
    tags => ["grepped", "grokked", "mutated"] 
    host => "some.host.local" 
    port => 1234 
    sender => "" 
    namespace => "" 
    timing => ["commands.%{command_name}.responsetime", "%{response_time}"] 
    increment => ["commands.%{command_name}.requests"] 
    } 
} 
+0

能否請您對克隆在這裏所做的是什麼展開?過濾器中的項目順序是否重要? – FuzzyAmi

+1

@FuzzyAmi這已經有一段時間了,我快速瀏覽了文檔。 「克隆」過濾器會創建新的日誌事件。然而,每個'克隆'事件都會再次通過整個管道。所以...我克隆每個消息,並通過add_tag => ['clone']添加一個「克隆」標記,然後當它再次通過'exclude_tags => ['clone']'確保我不會結束無限循環的克隆事件。 'clones => ['statsd','elastic-search']'鍵/值會爲每個標記創建2個克隆事件。 把它看作下游處理的分支機制。 – TCopple

+0

感謝澄清! – FuzzyAmi

0

clone過濾器是在你的情況其實是不必要的。

有幾件事情,我會建議:

  • 請簡化你的配置,直到你得到你的「基地」工作。不要設置可選參數,現在
  • 確保type相匹配
  • 使用標籤,它們是一個救星
  • 當「grepping」你應該保持一致,並一直使用grok或總是使用grep。我的選擇是grok

其他一些技巧... 選擇要基於什麼標籤,他們已成立與您的郵件該怎麼做。這裏是我的一個配置片段的例子:

grok{ 
    type => "company" 
    pattern => ["((status:new))"] 
    add_tag => ["company-vprod-status-new", "company-vprod-status-new-%{company}", "company-vprod-status-new-%{candidate_username}"] 
    tags=> "company-vprod-status-change" 
} 
output { 
elasticsearch { 
    host => "127.0.0.1" 
    type => "company" 
    index => "logstash-syslog-%{+YYYY.MM.dd}" 
} 
statsd { 
    host => "graphite.test.company.com" 
    increment => ["vprod.statuses.new.all", "vprod.statuses.new.%{company}.all"] 
    tags => ["company-vprod-status-new"] 
} 
} 

另外,真的很關注你的類型。如果將type屬性設置爲不存在的值,則該塊永遠不會被觸發。這就是爲什麼我更喜歡使用標籤,除非有明確的使用類型的原因。

+0

所有重要的提示,但答案不提供我的問題的實際答案。問題最終是所有的消息都通過了grep過濾器,而那些沒有通過它的消息被簡單地丟棄了。 – TCopple

+0

似乎還有另一個問題,而不是你剛剛提到的問題。您的ES塊已配置爲從「全部」類型獲取所有消息,而不是基於標籤進行過濾。因此,你在那裏的一切都是正確的,你提到的東西似乎不是罪魁禍首。如果您的解決方案有效,那真的很棒,但根據您提供的信息,它似乎不是一個甚至可以考慮的解決方案。也許你可以編輯你的答案來解釋你的解決方案爲什麼起作用以及解決了什麼? – Adam

0

您還可以添加:

drop => false 

在grep的節結束(如果你還在用grep即是)

相關問題