2015-02-06 21 views
1

我已經設置了從kafka服務器獲取輸入數據的風暴拓撲。我用卡夫卡風暴包來獲取數據。我在本地集羣中成功實現了kafka服務器和風暴拓撲之間的連接,但是我在從kafka服務器檢索數據時遇到了一些問題。Storm Kafka Spout未在本地集羣中提交偏移量,spout會重複檢索相同的消息

卡夫卡脫粒機反覆檢索相同的消息在運行時,即使我設置spoutconfig.forceFromStart=falsespoutconfig.startOffsetTime =-1

注:當我停止並重新啓動基於最新的偏移數據被正確發送集羣。

回答

2

我自己想通了,問題是用outputcollectorack()的方法。我已經實施了螺栓收集器BaseBasicBolt,它沒有承認kafkaspout。我用BaseRichBolt替換並手動製作this.collector.ack(tuple)

現在它的工作很好

相關問題