我正在使用卡夫卡噴口來消費消息。但如果我必須更改拓撲和上傳,那麼它會從舊郵件恢復還是從新郵件開始?卡夫卡噴口給我們特定從何處消費的時間戳,但我如何知道時間戳?卡夫卡風暴噴口更改拓撲結構並消耗舊的偏移量
回答
如果您正在使用KafkaSpout確保以下幾點:
- 在你SpoutConfig「ID」和 重新部署拓撲結構的新版本後,「zkroot」不改變風暴使用「 zkroot」,「id」將主題偏移量存儲到zookeeper中
- KafkaConfig.forceFromStart設置爲false。
KafkaSpout將偏移量存儲到zookeeper中。如果您在KafkaSpout的KafkaConfig中將forceFromStart設置爲true(可能是您首次部署拓撲時的情況),那麼在重新部署期間要非常小心,它會忽略存儲的zookeeper偏移量。確保將其設置爲false。
考慮編寫拓撲結構,以便在拓撲的main()方法執行時從屬性文件中讀取KafkaConfig.forceFromStart值。這將允許您的管理員控制是否重播Kafka消息。
spoutConfig.forceStartOffsetTime(-1);
它會選擇最新的偏移周圍的時間戳寫入啓動消費。你可以用 強制噴口總是從最新的偏移量開始,通過傳入-1,你可以強制 它通過傳入-2來從最早的偏移量開始。
基本上事件序列將是:
第一次啓動拓撲通過從開始讀取與以下屬性:
forceFromStart = true startOffsetTime = -2
上述道具將迫使它來啓動從主題開始。請記住擁有兩個屬性,因爲forceFromStart
會讓風暴讀取startOffsetTime
屬性,並使用設置的值來確定從何處開始讀取,並忽略zookeeper偏移量。
從現在起,您的拓撲將運行,zookeeper將保持偏移量。如果你的工作人員死亡,它將開始由主管開始,並開始讀取zookeeper中的偏移量。
現在如果你想重新啓動您的拓撲結構,你想從那裏它被關閉之前離開的閱讀,使用下列財產,並重新啓動拓撲:
forceFromStart = false
如果你想重新啓動您的拓撲結構,你想從主題的頭部/頂部閱讀,使用下列財產,並重新啓動拓撲:
forceFromStart = true startOffsetTime = -1
通過上述屬性,您告訴風暴不是讀取startOffsetTime
值,而是使用在關閉拓撲之前維護的zookeeper偏移量。
從現在起,每次重新啓動拓撲時,它都會從剩餘的地方讀取數據。
通過上述屬性,您可以告訴風暴忽略動物園管理員偏移量,並從最新的偏移量開始,這是該主題的提示。
- 1. 風暴 - 卡夫卡噴口慢慢消耗
- 2. 卡夫卡風暴噴口lein或Mvn
- 3. 風暴 - 拓撲結構到拓撲結構
- 4. 如何解決風暴卡夫卡噴口只消費卡夫卡的一半數據?
- 5. 卡夫卡拓撲最佳實踐
- 6. 卡夫卡噴口集成
- 7. 監測卡夫卡噴口
- 8. 卡夫卡0.9:消費從最早的卡夫卡偏移
- 9. 風暴拓撲不提交
- 10. 風暴拓撲:多對一
- 11. 卡夫卡消費者中的控制消息偏移量
- 12. 卡夫卡噴口的字段分組
- 13. 卡夫卡消耗相反的消息
- 14. 如何整合風暴和卡夫卡
- 15. 卡夫卡風暴HDFS/S3數據流
- 16. KafkaSpout(風暴)不在卡夫卡經理
- 17. 重置卡夫卡消費者的上一次偏移量
- 18. 監視風暴的拓撲結構,特別是使用JMX
- 19. 卡夫卡延遲消息消耗
- 20. 從卡夫卡流重置消費者偏移量
- 21. 卡夫卡噴口在設置多個工人時無法在風暴中確認消息
- 22. 在風暴拓撲結構上Jedis「無法獲得池資源」
- 23. 在僞集羣服務器上部署風暴拓撲結構
- 24. 如何在風暴拓撲結構中使用drools
- 25. 如何從卡夫卡的舊偏移點獲取數據?
- 26. 蔭新卡夫卡和風暴,什麼是開始學習卡夫卡和風暴
- 27. 卡夫卡源連接器得到陳舊偏移值
- 28. Apache風暴 - 帶風暴集羣的地圖拓撲
- 29. 瞭解風暴拓撲可視化
- 30. 風暴拓撲生成異常