2017-08-23 27 views
0

我正在使用logstash和elasticsearch使用Twitter插件收集推文。我的問題是,我收到來自Twitter的文檔,並且想在編制文檔之前進行一些預處理。比方說,我有這個來自Twitter的文檔結果:如何在索引之前預處理文檔?

{ 
    "tweet": { 
     "tweetId": 1025, 
     "tweetContent": "Hey this is a fake document for stackoverflow #stackOverflow #elasticsearch", 
     "hashtags": ["stackOverflow", "elasticsearch"], 
     "publishedAt": "2017 23 August", 
     "analytics": { 
      "likeNumber": 400, 
      "shareNumber": 100, 
     } 
    }, 
    "author":{ 
     "authorId": 819744, 
     "authorAt": "the_expert", 
     "authorName": "John Smith", 
     "description": "Haha it's a fake description" 
    } 
} 

現在出了這個文件,Twitter的是給我,我想生成兩個文件: 第一個將在嘰嘰喳喳/鳴叫/索引1025:

# The id for this document should be the one from tweetId `"tweetId": 1025` 
{ 
    "content": "Hey this is a fake document for stackoverflow #stackOverflow #elasticsearch", # this field has been renamed 
    "hashtags": ["stackOverflow", "elasticsearch"], 
    "date": "2017/08/23", # the date has been formated 
    "shareNumber": 100 # This field has been flattened 
} 

第二個將在嘰嘰喳喳/作家/ 819744索引:

# The id for this document should be the one from authorId `"authorId": 819744 ` 
{ 
    "authorAt": "the_expert", 
    "description": "Haha it's a fake description" 
} 

我定義我的輸出如下:

output { 
    stdout { codec => dots } 
    elasticsearch { 
    hosts => [ "localhost:9200" ] 
    index => "twitter" 
    document_type => "tweet" 
    } 
} 

我該如何處理來自twitter的信息?

編輯:

所以我滿配置文件應該是這樣:

input { 
    twitter { 
     consumer_key => "consumer_key" 
     consumer_secret => "consumer_secret" 
     oauth_token => "access_token" 
     oauth_token_secret => "access_token_secret" 
     keywords => [ "random", "word"] 
     full_tweet => true 
     type => "tweet" 
    } 
} 
filter { 
    clone { 
    clones => ["author"] 
    } 
    if([type] == "tweet") { 
    mutate { 
     remove_field => ["authorId", "authorAt"] 
    } 
    } else { 
    mutate { 
     remove_field => ["tweetId", "tweetContent"] 
    } 
    } 
} 
output { 
    stdout { codec => dots } 
    if [type] == "tweet" { 
    elasticsearch { 
     hosts => [ "localhost:9200" ] 
     index => "twitter" 
     document_type => "tweet" 
     document_id => "%{[tweetId]}" 
    } 
    } else { 
    elasticsearch { 
     hosts => [ "localhost:9200" ] 
     index => "twitter" 
     document_type => "author" 
     document_id => "%{[authorId]}" 
    } 
    } 
} 

回答

2

您可以使用克隆過濾器插件上logstash。

與樣本logstash配置文件,從標準輸入需要一個JSON輸入,只顯示在標準輸出上輸出:

input { 
    stdin { 
    codec => json 
    type => "tweet" 
    } 
} 
filter { 
    mutate { 
     add_field => { 
     "tweetId" => "%{[tweet][tweetId]}" 
     "content" => "%{[tweet][tweetContent]}" 
     "date" => "%{[tweet][publishedAt]}" 
     "shareNumber" => "%{[tweet][analytics][shareNumber]}" 
     "authorId" => "%{[author][authorId]}" 
     "authorAt" => "%{[author][authorAt]}" 
     "description" => "%{[author][description]}" 
     } 
    } 
    date { 
     match => ["date", "yyyy dd MMMM"] 
     target => "date" 
    } 
    ruby { 
     code => ' 
     event.set("hashtags", event.get("[tweet][hashtags]")) 
    ' 
    } 
    clone { 
     clones => ["author"] 
    } 
    mutate { 
     remove_field => ["author", "tweet", "message"] 
    } 
    if([type] == "tweet") { 
     mutate { 
     remove_field => ["authorId", "authorAt", "description"] 
     } 
    } else { 
     mutate { 
     remove_field => ["tweetId", "content", "hashtags", "date", "shareNumber"] 
     } 
    } 
} 
output { 
    stdout { 
    codec => rubydebug 
    } 
} 

作爲輸入:

{"tweet": { "tweetId": 1025, "tweetContent": "Hey this is a fake document", "hashtags": ["stackOverflow", "elasticsearch"], "publishedAt": "2017 23 August","analytics": { "likeNumber": 400, "shareNumber": 100 } }, "author":{ "authorId": 819744, "authorAt": "the_expert", "authorName": "John Smith", "description": "fake description" } } 

你會得到這兩個文件:

{ 
      "date" => 2017-08-23T00:00:00.000Z, 
     "hashtags" => [ 
     [0] "stackOverflow", 
     [1] "elasticsearch" 
    ], 
      "type" => "tweet", 
     "tweetId" => "1025", 
     "content" => "Hey this is a fake document", 
    "shareNumber" => "100", 
    "@timestamp" => 2017-08-23T20:36:53.795Z, 
     "@version" => "1", 
      "host" => "my-host" 
} 
{ 
    "description" => "fake description", 
      "type" => "author", 
     "authorId" => "819744", 
    "@timestamp" => 2017-08-23T20:36:53.795Z, 
     "authorAt" => "the_expert", 
     "@version" => "1", 
      "host" => "my-host" 
} 

您也可以使用紅寶石腳本來拼合字段,然後在必要時在mutate上使用重命名。

如果您希望elasticsearch使用authorId和tweetId而不是默認ID,則可以使用document_id配置elasticsearch輸出。

output { 
    stdout { codec => dots } 
    if [type] == "tweet" { 
    elasticsearch { 
     hosts => [ "localhost:9200" ] 
     index => "twitter" 
     document_type => "tweet" 
     document_id => "%{[tweetId]}" 
    } 
    } else { 
    elasticsearch { 
     hosts => [ "localhost:9200" ] 
     index => "twitter" 
     document_type => "tweet" 
     document_id => "%{[authorId]}" 
    } 
    } 
} 
+0

我編輯我的問題,向您展示整體配置文件。我如何管理字段的重命名或字段的扁平化? – mel

+0

我編輯了我的答案。希望對你有效。 – Imma

+0

是的,它在驗證答案之前只有一個問題: add_field似乎將我的hashtags數組串起來。有沒有辦法解決這個問題,並有'[「stackOverflow」,「elasticsearch」]'而不是''stackOverflow,elasticsearch「'我試過add_tag,但似乎沒有工作作爲add_field – mel