如何在消費或處理完消息後從卡夫卡得到確認。可能聽起來很愚蠢,但是有什麼辦法可以知道已經收到確認的那條消息的開始和結束偏移嗎?如何從卡夫卡獲得鳴謝
6
A
回答
1
我發現到目前爲止是0.8,他們已採取以下方式,從用於讀取偏移選擇..
kafka.api.OffsetRequest.EarliestTime()發現在數據的開始記錄並開始流式傳輸,kafka.api.OffsetRequest.LatestTime()只會傳輸新消息。
示例代碼 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
仍不能確定的確認部分
1
卡夫卡是不是真的結構要做到這一點。要理解爲什麼,請查看設計文檔here。
爲了提供一次性確認,您需要爲您的應用程序創建一些外部跟蹤系統,您可以在其中明確寫入確認並在事務ID上實施鎖定,以確保只能處理一次事務。實施系統的計算成本非常高,這是大型事務處理系統需要比較特殊的硬件並且可以說比可比系統如卡夫卡低的可擴展性的主要原因之一。
如果您不需要強大的持久性語義,您可以使用groups API粗略跟蹤最後一條消息的讀取時間。這確保每條消息至少讀取一次。請注意,由於groups API沒有爲您提供明確跟蹤應用程序自身處理邏輯的能力,因此您的實際處理保證在這種情況下相當弱。依賴冪等處理的方案在這種環境中很常見。
或者,您可以使用名字較差的SimpleConsumer API(使用起來相當複雜),它使您能夠明確地跟蹤應用程序中的時間戳。這是可以通過原生Kafka API實現的最高級別的處理保證,因爲它使您能夠跟蹤應用程序自己處理從隊列中讀取的數據。
相關問題
- 1. 如何從卡夫卡連接器獲得卡夫卡引導配置設置
- 2. 如何從卡夫卡用卡夫卡流通過間隔
- 3. 如何從卡夫卡隊列直接從卡夫卡隊列讀取數據
- 4. 如何使用Kafka 0.10獲得卡夫卡滯後?
- 5. 如何獲得卡夫卡經紀人的連接字符串
- 6. 卡夫卡:如何從副本集
- 7. 如何在卡夫卡2.8
- 8. Logstash如何比卡夫卡
- 9. 卡夫卡:如何在卡夫卡實現循環分區
- 10. 卡夫卡0.9:消費從最早的卡夫卡偏移
- 11. 卡夫卡 - 爪哇 - 從crashs
- 12. 從卡夫卡導入HBase
- 13. 如何從卡夫卡的舊偏移點獲取數據?
- 14. UnknownCodecException卡夫卡
- 15. 卡夫卡
- 16. 彈簧卡夫卡例如
- 17. 如何使卡夫卡源重新連接時,卡夫卡重新啓動
- 18. 卡夫卡與斯卡拉
- 19. 卡夫卡0.10.2消費者獲得大量副本
- 20. 卡夫卡消費者獲得關鍵值對
- 21. 卡夫卡Kubernetes/Minikube
- 22. 如何整合風暴和卡夫卡
- 23. 如何計算卡夫卡補償值?
- 24. 如何暫停卡夫卡消費者?
- 25. 如何單元測試卡夫卡流
- 26. 卡夫卡消費者無法從卡夫卡的主題訂閱
- 27. 卡夫卡的retention.ms沒有被卡夫卡0.10.2強制執行?
- 28. 卡夫卡連接或卡夫卡客戶
- 29. 春季卡夫卡表現vs本地卡夫卡api
- 30. 卡夫卡流和卡夫卡表一對多關係加入