2017-05-15 149 views
0

我需要一些Logstash的幫助。我目前有以下Logstash配置工作。當[message]標籤中有「令牌驗證失敗」時,它會發送一封電子郵件,表明身份驗證問題。Elkstack Logstash - 如何通過電子郵件發送閾值警報

input { 

    tcp { 
    codec => "json" 
    port => 5144 
    tags => ["windows","nxlog"] 
    type => "nxlog-json" 
    } 

} # end input 

filter { 

    if [type] == "nxlog-json" { 
    date { 
     match => ["[EventTime]", "YYYY-MM-dd HH:mm:ss"] 
     timezone => "Europe/London" 
    } 
    mutate { 
     rename => [ "AccountName", "user" ] 
     rename => [ "AccountType", "[eventlog][account_type]" ] 
     rename => [ "ActivityId", "[eventlog][activity_id]" ] 
     rename => [ "Address", "ip6" ] 
     rename => [ "ApplicationPath", "[eventlog][application_path]" ] 
     rename => [ "AuthenticationPackageName", "[eventlog][authentication_package_name]" ] 
     rename => [ "Category", "[eventlog][category]" ] 
     rename => [ "Channel", "[eventlog][channel]" ] 
     rename => [ "Domain", "domain" ] 
     rename => [ "EventID", "[eventlog][event_id]" ] 
     rename => [ "EventType", "[eventlog][event_type]" ] 
     rename => [ "File", "[eventlog][file_path]" ] 
     rename => [ "Guid", "[eventlog][guid]" ] 
     rename => [ "Hostname", "hostname" ] 
     rename => [ "Interface", "[eventlog][interface]" ] 
     rename => [ "InterfaceGuid", "[eventlog][interface_guid]" ] 
     rename => [ "InterfaceName", "[eventlog][interface_name]" ] 
     rename => [ "IpAddress", "ip" ] 
     rename => [ "IpPort", "port" ] 
     rename => [ "Key", "[eventlog][key]" ] 
     rename => [ "LogonGuid", "[eventlog][logon_guid]" ] 
     rename => [ "Message", "message" ] 
     rename => [ "ModifyingUser", "[eventlog][modifying_user]" ] 
     rename => [ "NewProfile", "[eventlog][new_profile]" ] 
     rename => [ "OldProfile", "[eventlog][old_profile]" ] 
     rename => [ "Port", "port" ] 
     rename => [ "PrivilegeList", "[eventlog][privilege_list]" ] 
     rename => [ "ProcessID", "pid" ] 
     rename => [ "ProcessName", "[eventlog][process_name]" ] 
     rename => [ "ProviderGuid", "[eventlog][provider_guid]" ] 
     rename => [ "ReasonCode", "[eventlog][reason_code]" ] 
     rename => [ "RecordNumber", "[eventlog][record_number]" ] 
     rename => [ "ScenarioId", "[eventlog][scenario_id]" ] 
     rename => [ "Severity", "level" ] 
     rename => [ "SeverityValue", "[eventlog][severity_code]" ] 
     rename => [ "SourceModuleName", "nxlog_input" ] 
     rename => [ "SourceName", "[eventlog][program]" ] 
     rename => [ "SubjectDomainName", "[eventlog][subject_domain_name]" ] 
     rename => [ "SubjectLogonId", "[eventlog][subject_logonid]" ] 
     rename => [ "SubjectUserName", "[eventlog][subject_user_name]" ] 
     rename => [ "SubjectUserSid", "[eventlog][subject_user_sid]" ] 
     rename => [ "System", "[eventlog][system]" ] 
     rename => [ "TargetDomainName", "[eventlog][target_domain_name]" ] 
     rename => [ "TargetLogonId", "[eventlog][target_logonid]" ] 
     rename => [ "TargetUserName", "[eventlog][target_user_name]" ] 
     rename => [ "TargetUserSid", "[eventlog][target_user_sid]" ] 
     rename => [ "ThreadID", "thread" ] 

    } 
    mutate { 
     remove_field => [ 
        "CurrentOrNextState", 
        "Description", 
        "EventReceivedTime", 
        "EventTime", 
        "EventTimeWritten", 
        "IPVersion", 
        "KeyLength", 
        "Keywords", 
        "LmPackageName", 
        "LogonProcessName", 
        "LogonType", 
        "Name", 
        "Opcode", 
        "OpcodeValue", 
        "PolicyProcessingMode", 
        "Protocol", 
        "ProtocolType", 
        "SourceModuleType", 
        "State", 
        "Task", 
        "TransmittedServices", 
        "Type", 
        "UserID", 
        "Version" 
        ] 
    } 
    } 

} 

output { 
    elasticsearch { 
    hosts => ["localhost:9200"] 
    } 

if "Token validation failed" in [message] { 

email { 

address => "smtp01.domain.com" 
to => "[email protected]" 
from => "[email protected]" 
subject => "Auth Issue" 
body => "Auth Issue" 
port => 25 
use_tls => false 
via => "smtp" 

} 
} 

} # end output 

我想知道如何獲得電子郵件發送只有當消息標記在一分鐘內「令牌驗證失敗」的10倍。如果它有9個或以下的條目,它不會發送任何電子郵件。我需要設置什麼配置才能使其工作?

回答

0

有幾種方法可以實現這一點。

答:您可以使用XPack警報(以前稱爲看守)或ElastAlert爲this answer

B.描述您可以爲了跟蹤和計數「令牌驗證失敗」消息使用aggregate Logstash filter中描述this answer。您只需

aggregate { 
    task_id => "%{[eventlog][target_logonid]}" 
    code => "map['failed_count'] ||= 0; map['failed_count'] += 1;" 
    push_map_as_event_on_timeout => true 
    timeout => 60 # 1 minute timeout 
    timeout_tags => ['_aggregatetimeout'] 
    timeout_code => "event.set('token_failed', event.get('failed_count') >= 10)" 
    } 

然後你就可以發送電子郵件只if [token_failed]

C.可以以計數和緩存的次數「令牌驗證失敗」發生消息使用ruby Logstash filter。它基本上與B相同,但通過在Ruby代碼中自己實現邏輯。

D.您可以使用metrics Logstash filter來計算消息字段中具有「令牌​​驗證失敗」事件的速率。

metrics { 
    meter => [ "message" ] 
    rates => [ 1 ] 
    add_tag => "metric" 
    } 

然後在你的輸出,你可以簡單地使用這樣的計量信息:

if "metric" in [tags] and [Token validation failed][count] >= 10 { 
    email { 
     ... 
    } 
    } 

注意與解決方案B和C,你不能more than one worker(即-w 1)推出Logstash。我已經提交了一個enhancement request來「解決」這個問題,但是由於Logstash團隊已經擁有了大量的TODO,我們將看到會發生什麼。