回答

4

我不能說,如果這是最好的方法還是不行,但是當我開始尋找類似的功能,我找不到任何容易可用的框架。我發現這個項目:

https://github.com/reachkrishnaraj/kafka-elasticsearch-standalone-consumer/tree/branch2.0

,並開始貢獻,因爲它沒有做的一切,我想要的東西,也不會輕易擴展。現在2.0版本非常可靠,我們在公司每天處理/索引300M +事件時將其用於生產。

這不是一個自我推銷:) - 只是分享我們如何做同一類型的工作。當然,現在可能還有其他選擇。

+0

注:最新的進展是移動到該回購現在,它有泊塢窗和搖籃支持:https://github.com/BigDataDevs/kafka-elasticsearch-consumer – Marina

0

我使用火花流,這是一個相當簡單的實現使用Scala。

1

https://github.com/confluentinc/kafka-connect-elasticsearch

或者你可以試試這個源

https://github.com/reachkrishnaraj/kafka-elasticsearch-standalone-consumer

運行作爲一個標準的JAR

** 1。將代碼下載到$ INDEXER_HOME目錄中。

** 2。 cp $ INDEXER_HOME/src/main/resources/kafka-es-indexer.properties.template /your/absolute/path/kafka-es-indexer.properties文件 - 按照註釋中的解釋更新所有相關屬性

** 3。 CP $ INDEXER_HOME/src目錄/主/資源/ logback.xml.template /your/absolute/path/logback.xml

指定目錄要存儲在日誌:

調整最大尺寸和數量的值根據需要記錄日誌文件

** 4。建立/創建的應用程序的jar(請確保您已安裝MAVEN):

cd $INDEXER_HOME 
mvn clean package 

卡夫卡-ES-索引-2.0.jar將在$ INDEXER_HOME/bin目錄下創建的。所有依賴關係將被放置到$ INDEXER_HOME/bin/lib中。所有JAR依賴關係都通過kafka-es-indexer-2.0.jar清單鏈接。

** 5。編輯你的$ INDEXER_HOME/run_indexer.sh腳本: - 根據需要修改它的可執行文件(chmod a + x $ INDEXER_HOME/run_indexer.sh) - 根據你的環境更新標記爲「CHANGE FOR YOUR ENV」的屬性

** 6。運行應用程序[使用JDK1.8]:

./run_indexer.sh