我有一個消費者組中有兩個消費者分配了相同的kafka主題分區。我希望從消費者B內部獲得最後一次讀取偏移量,消費者A.任何想法,如何實現這一點?如何獲取消費者組的最後消費抵消?
1
A
回答
2
0
卡夫卡店偏移量由(消費者組ID,主題,分區),所以首先要注意的一點是,從卡夫卡點認爲沒有像「最後一次讀取消費者A的抵消額」那樣的東西。您可以通過Kafka使用者API獲得的所有信息都適用於給定的(組,主題,分區)。消費者API中有兩種方法可能有用。
commited():獲取給定分區的最後提交的偏移量(無論提交是由此進程還是其他進程發生的)。
position():獲取將被提取的下一條記錄的偏移量(如果具有該偏移量的記錄存在)。
如果這不是你所需要的,那麼你將不得不自己實現一些東西。假設你已經知道如何獲得最後抵消消費的讀操作,那麼消費者A應該是值存儲在某個位置,提供給消費者B.這個位置可能是
- 卡夫卡本身。例如,消費者A可以將最近的讀取偏移量發佈到 像ConsumerA-p0這樣的衆所周知的主題,而消費者B可以訂閱 這個主題。
- 動物園管理員。再次,在一個衆所周知的道路上達成一致。
- 外部數據庫。
- 更基本的選擇,如果消費者都共享同一個操作系統:IPC,在文件系統中的文件,在內存與鎖保護的變量等
+0
我同意你和理論上它看起來似乎合理,但我想知道是否有某些功能由kafka本身提供服務的目的 – swappy
+0
添加更多的細節給我的答案。我認爲這些都是你有的選擇。 –
相關問題
- 1. 使用kafka庫查找消費者抵消滯後的代碼?
- 2. 卡夫卡消費者抵消最大值?
- 3. 從kafka讀取消費者抵消的工具0.9
- 4. RabbitMQ pika消費者取消後的異步消費者心跳問題
- 5. 生產者消費者請求取消
- 6. 消費者生產者多線程消費者不會消逝
- 7. 如何從Salesforce獲取消費者密鑰和消費者密鑰
- 8. 如何在使用Semphores的生產者 - 消費者中消費?
- 9. 如何從生產者消費卡夫卡的消費者?
- 10. 卡夫卡消費者不消費
- 11. 消費消費使用卡夫卡消費者 - Java
- 12. RabbitMQ消費者
- 13. 消費者池
- 14. Kafka消費羣集環境抵消
- 15. 生產者 - 消費;消費者如何停止?
- 16. 如何查找從ActiveMQ獲取消息的消費者的IP?
- 17. 消息消費者實現
- 18. 瞭解消費者組ID
- 19. 在RabbitMQ配置消費者取消
- 20. 如何使用消費者
- 21. 消費者過濾的生產者 - 消費者阻塞隊列
- 22. 生產者/消費者線程中的油門消費者
- 23. Java生產者 - 消費者:生產者不「通知()」消費者
- 24. 卡夫卡工作/抵消協調與消費者
- 25. 消費者沒有從卡夫卡消費者提交的消息10消費者
- 26. 生產消費者
- 27. 競爭消費者
- 28. Mochiweb + AMQP消費者
- 29. SAML消費者URL
- 30. ActiveMQ消費者OutOfMemoryException
你想有一個以上的消費者在消費從單個分區平行嗎? –
@LucianoAfranllie類似的,但我可以讓消費者平行讀,但我希望只跟蹤其他消費者在卡夫卡的最後讀取偏移量。 – swappy