2017-10-05 32 views
0

最終目標:連接Elasticsearch和kafka,並將ES索引中正在進行的更改事件接收到kafka。從卡夫卡,我有聽衆做進一步處理。ElasticSearch到Kafka事件 - 每次更改時使用Logstash

方法:我使用Logstash輸入和輸出插件。這是配置代碼。

input { 
     elasticsearch { 
       hosts => ["localhost:9200"] 
       index => "liferay-20116" 
     } 
} 
output { 
     kafka { 
     topic_id => "elastic-topic-index" 
     codec => json 
     } 
} 

它正在工作,但有一個奇怪的問題。

當我收聽kafka時,它會從ES讀取所有文檔,大約176個文檔。

一旦它讀取,它停止一段時間說2秒,然後再讀取整個176文檔!

我不知道是什麼問題,這是由於Logstash行爲還是卡夫卡行事怪異?

任何幫助,將不勝感激!

+0

你要發送,如果匹配查詢收到的文件? 在這種情況下,看看https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-percolate-query.html –

回答

0

這是這個插件的標準行爲 - 它將數據匹配到給定的查詢。如果您只想更改文檔,唯一的解決方法是建立關於自己改變內容的知識 - 例如,您需要有條目時間戳,然後將這些知識併入發送給ES的查詢中。

+0

所以你的意思是它的ElasticSearch輸入插件是推動數據,即每176 doc每隔2秒鐘向kafka說?我認爲這應該發生在ES有任何變化的情況下,而不是每2秒左右。 –

+0

我不知道爲什麼插件每2秒推一次,但插件只是執行查詢 - 它不知道ES中發生了什麼變化。 –

+0

謝謝。我其實已經明白了。這不是肯定會工作。在同一篇博客文章上撰寫並分享:) –