我有要求從主題讀取消息,批量並將批量推送到外部系統。如果批處理因任何原因失敗,我需要再次使用同一組消息並重復該過程。因此,對於每個批次,每個分區的起點和終點都存儲在數據庫中。爲了實現這個目標,我們根據存儲的先前偏移量,通過爲讀者分配分區,爲每個分區創建一個卡夫卡消費者,消費者尋找該位置並開始閱讀。我已經關閉了自動提交,並且我沒有提交消費者的偏移量。對於每批次,我爲每個分區創建一個新的消費者,從存儲的最後偏移量讀取消息並將其發佈到外部系統。您是否發現在消費消息時沒有提交消息並跨批次使用同一個消費者組的問題,但是在任何時候,每個分區不會有多個消費者?消費者沒有從卡夫卡消費者提交的消息10消費者
0
A
回答
1
您的設計對我來說似乎合理。
向Kafka承諾抵消只是Kafka內部一種方便的內置機制,用於跟蹤抵消。然而,沒有任何要求使用它 - 你也可以使用任何其他機制來跟蹤偏移量(就像在你的情況下使用數據庫一樣)。
此外,如果您手動分配分區,則無論如何都不會有組管理。所以參數group.id
不起作用。有關更多詳細信息,請參閱http://docs.confluent.io/current/clients/consumer.html。
相關問題
- 1. 卡夫卡消費者不消費
- 2. 如何從生產者消費卡夫卡的消費者?
- 3. 卡夫卡gruop消費者
- 4. 消費消費使用卡夫卡消費者 - Java
- 5. 卡夫卡10.2新消費者與舊消費者
- 6. 卡夫卡消費者沒有消費數據
- 7. 卡夫卡消費者/生產者API
- 8. 簡單的卡夫卡消費者沒有收到消息
- 9. 消費者在卡夫卡消費的消息有哪些方式?
- 10. 卡夫卡消費者不是從
- 11. 如何關閉卡夫卡消費者一旦消費完所有消息?
- 12. 使卡夫卡消費者在訂閱之前消費現有消息
- 13. Spark Streaming中的卡夫卡消費者
- 14. 雲中的卡夫卡消費者
- 15. 卡夫卡python消費者開始時讀取所有消息
- 16. 復位消費者在卡夫卡0.10
- 17. 無法創建卡夫卡消費者
- 18. 卡夫卡消費者同步行爲
- 19. 問題在消費者卡夫卡
- 20. 暫停高級卡夫卡消費者
- 21. 卡夫卡消費者行爲
- 22. 關閉卡夫卡消費者
- 23. 卡夫卡消費者行爲
- 24. 春季集成卡夫卡消費者
- 25. 如何暫停卡夫卡消費者?
- 26. 卡夫卡消費者 - Java客戶端
- 27. 卡夫卡消費者與JAVA
- 28. 卡夫卡0.90消費者堅持
- 29. 卡夫卡kafka.common.MessageSizeTooLargeException在消費者
- 30. 卡夫卡消費者滯後JMX