我正在使用Java api消費者連接器。每當消費者開始閱讀某個主題時,就會從一個主題的開始處讀取內容,並且需要相當長的時間來跟上最新的事件。我們如何確保消費者讀取當前的抵消?Java:當我開始從kafka話題讀取時如何從當前偏移量讀取
回答
卡夫卡9:
- 如果設置組ID爲你的消費者,卡夫卡將存儲提交(處理)的偏移量爲您服務。這工作,如果你在卡夫卡 read more
- 使用新的消費者,如果你總是希望從最新的偏移讀取,你可以指定OffsetResetStrategy.LATEST
要完成納塔利婭的答案,我會說,你可能不關心存儲偏移量,你只是想要消費最新的消息。
要使用大多數消費者實現(包括0.8.4「老」消費者和「新」的消費者在0.9.x版本及以上)實現這種行爲,你需要做兩件事情:
- 套裝將組ID設置爲隨機值,這樣每次消費者啓動它時都不能從任何地方恢復偏移量,這將觸發「偏移重置」請求。
- 將
OffsetRequestStrategy
設置爲latest
(或者您在客戶端使用的任何其他名稱)爲latest
,這樣當您的客戶端向Kafka請求可用的偏移量時,它將獲得日誌中最後一條(最新)消息的偏移量。
最簡單的方法是禁用自動提交(即auto.commit.enable=false
),並在消費者配置中使用auto.offset.reset=latest
(或=largest
較舊版本的卡夫卡)。
在卡夫卡的流程如下:
- 啓動消費
- 消費者尋找一個有效的承諾offse
- 如果找到了,如果沒有找到它恢復從那裏
- 處理,根據「auto.offset.reset」開始處理
因此,只要您的客戶羣有一個有效的提交偏移量,「auto.offset.reset」根本就沒有任何作用。因此,你也不應該手動提交。
如果已經存在提交的偏移量,則需要手動將其刪除,然後才能重新啓動使用者,如果要從當前偏移量讀取而不是處理和舊數據。 (或者使用一個新的group.id
,因爲您知道沒有提交的偏移量。)
作爲所有這些的替代方法,您還可以在消費者中「設法結束」每個分區。但是這會讓你的代碼更加複雜,如果你的消費者組沒有發生任何提交,就可以避免它。
爲什麼我們不應該手動提交?如果我們不手動執行,什麼時候會發生。我嘗試設置(auto.commit.enable = false)和uto.offset.reset =最大,並且具有與以前相同的組ID,但它仍然從頭閱讀。 – Priyanka
更新了我的答案。這有意義嗎? –
對於卡夫卡0。10(或更早可能),你可以這樣做:
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumer = new KafkaConsumer<>(properties);
consumer.seekToEnd(Collections.emptySet());
這將關閉存儲消費者(因爲你不使用它)的經紀人偏移,並尋求所有分區的最新位置。
您可能會補充說,有必要使用尚未提交讀取偏移量的使用者組。 – Schleichardt
確實它不會刪除任何現有的存儲偏移量。但是不會使用seek只是覆蓋這個? 我意識到OP沒有定義「電流偏移」的含義。我的回答假設她希望最新發布。如果她的意思是「最新消費」,那麼需要啓用自動提交,並且每次運行時消費者組名稱都必須相同。 – AutomatedMike
- 1. 在Kafka中讀取偏移量
- 2. kafka:從第二個主題讀取偏移recvd從第一個
- 3. 從當前UIWebview讀取HTML
- 4. 從java當前目錄讀取文件
- 5. 獲取Olson時區當前偏移量
- 6. 如何從NumberPicker讀取當前值?
- 7. InputStream從偏移量讀取數據
- 8. 如何從iframe中獲取當前的iframe偏移量?
- 9. kafka從當前時間獲取數據
- 10. 如何開始讀取二進制數據的偏移量?
- 11. 讀取當前數據從Asp.Table
- 12. 如何從XAML LinearGradientBrush的偏移量中讀取顏色?
- 13. 讀取從字節偏移量開始的一行文件直到新行
- 14. ArrayIndexOutOfBoundsException當從文件讀取
- 15. 如何使用Dapper從Oracle讀取時間偏移
- 16. NSMutableArray EXC_BAD_ACCESS當我嘗試從它讀取
- 17. 當使用BinData無法讀取時停止從文件讀取
- 18. 當我從文本文件中讀取時,如何跳過行?
- 19. 如何從中間偏移量讀取超過3GB的Java InputStream數據
- 20. 如何使卡夫卡消費者從上次消費的偏移量讀取,但不是從開始
- 21. 如何從Java讀取avro?
- 22. 當前讀取選擇線
- 23. 讀取當前表ID
- 24. 如何讀取MaxConcurrentRequestsPerCPU的當前值
- 25. 當我從p12文件讀取nsdata時提取標識
- 26. 讀取代碼點時發生偏移量問題
- 27. PHP根據GMT偏移量獲取當前本地時間
- 28. 如何從rebol腳本讀取當前目錄
- 29. 如何從Excel表格中讀取當前週數?
- 30. 將應用程序從Kafka 0.8.2.1移植到Kafka 0.9.0。讀取補償問題
如果已經存儲偏移量,OffsetResetStrategy將被忽略 – serejja