我不太熟悉卡夫卡,但我想知道什麼是 從卡夫卡批量讀取數據的最佳方式,所以我可以使用Elasticsearch Bulk Api更快地加載數據並且可靠。我如何批量卡夫卡讀取到Elasticsearch
順便說一句,我使用Vertx我卡夫卡消費者
謝謝
我不太熟悉卡夫卡,但我想知道什麼是 從卡夫卡批量讀取數據的最佳方式,所以我可以使用Elasticsearch Bulk Api更快地加載數據並且可靠。我如何批量卡夫卡讀取到Elasticsearch
順便說一句,我使用Vertx我卡夫卡消費者
謝謝
我不能說,如果這是最好的方法還是不行,但是當我開始尋找類似的功能,我找不到任何容易可用的框架。我發現這個項目:
https://github.com/reachkrishnaraj/kafka-elasticsearch-standalone-consumer/tree/branch2.0
,並開始貢獻,因爲它沒有做的一切,我想要的東西,也不會輕易擴展。現在2.0版本非常可靠,我們在公司每天處理/索引300M +事件時將其用於生產。
這不是一個自我推銷:) - 只是分享我們如何做同一類型的工作。當然,現在可能還有其他選擇。
我使用火花流,這是一個相當簡單的實現使用Scala。
https://github.com/confluentinc/kafka-connect-elasticsearch
或者你可以試試這個源
https://github.com/reachkrishnaraj/kafka-elasticsearch-standalone-consumer
運行作爲一個標準的JAR
** 1。將代碼下載到$ INDEXER_HOME目錄中。
** 2。 cp $ INDEXER_HOME/src/main/resources/kafka-es-indexer.properties.template /your/absolute/path/kafka-es-indexer.properties文件 - 按照註釋中的解釋更新所有相關屬性
** 3。 CP $ INDEXER_HOME/src目錄/主/資源/ logback.xml.template /your/absolute/path/logback.xml
指定目錄要存儲在日誌:
調整最大尺寸和數量的值根據需要記錄日誌文件
** 4。建立/創建的應用程序的jar(請確保您已安裝MAVEN):
cd $INDEXER_HOME
mvn clean package
卡夫卡-ES-索引-2.0.jar將在$ INDEXER_HOME/bin目錄下創建的。所有依賴關係將被放置到$ INDEXER_HOME/bin/lib中。所有JAR依賴關係都通過kafka-es-indexer-2.0.jar清單鏈接。
** 5。編輯你的$ INDEXER_HOME/run_indexer.sh腳本: - 根據需要修改它的可執行文件(chmod a + x $ INDEXER_HOME/run_indexer.sh) - 根據你的環境更新標記爲「CHANGE FOR YOUR ENV」的屬性
** 6。運行應用程序[使用JDK1.8]:
./run_indexer.sh
注:最新的進展是移動到該回購現在,它有泊塢窗和搖籃支持:https://github.com/BigDataDevs/kafka-elasticsearch-consumer – Marina