2016-07-29 22 views

回答

0

卡夫卡9:

  1. 如果設置組ID爲你的消費者,卡夫卡將存儲提交(處理)的偏移量爲您服務。這工作,如果你在卡夫卡 read more
  2. 使用新的消費者,如果你總是希望從最新的偏移讀取,你可以指定OffsetResetStrategy.LATEST
+1

如果已經存儲偏移量,OffsetResetStrategy將被忽略 – serejja

0

要完成納塔利婭的答案,我會說,你可能不關心存儲偏移量,你只是想要消費最新的消息。

要使用大多數消費者實現(包括0.8.4「老」消費者和「新」的消費者在0.9.x版本及以上)實現這種行爲,你需要做兩件事情:

  1. 套裝將組ID設置爲隨機值,這樣每次消費者啓動它時都不能從任何地方恢復偏移量,這將觸發「偏移重置」請求。
  2. OffsetRequestStrategy設置爲latest(或者您在客戶端使用的任何其他名稱)爲latest,這樣當您的客戶端向Kafka請求可用的偏移量時,它將獲得日誌中最後一條(最新)消息的偏移量。
+0

如果OP對存儲偏移量不感興趣,使用'KafkaConsumer.assign()'而不是'subscribe()'然後只是'seekToEnd ()'? – Harald

+0

也許,但他需要得到分區分配自己 – serejja

+0

@serejja是的,我嘗試設置組ID爲新名稱和(auto.offset.reset =最大)。有效。但是我有一些現有的消費者,並且我想要所有這些消費者都使用同一個組ID。我們不能解決我們有相同組ID的問題嗎? – Priyanka

1

最簡單的方法是禁用自動提交(即auto.commit.enable=false),並在消費者配置中使用auto.offset.reset=latest(或=largest較舊版本的卡夫卡)。

在卡夫卡的流程如下:

  1. 啓動消費
  2. 消費者尋找一個有效的承諾offse
    • 如果找到了,如果沒有找到它恢復從那裏
    • 處理,根據「auto.offset.reset」開始處理

因此,只要您的客戶羣有一個有效的提交偏移量,「auto.offset.reset」根本就沒有任何作用。因此,你也不應該手動提交。

如果已經存在提交的偏移量,則需要手動將其刪除,然後才能重新啓動使用者,如果要從當前偏移量讀取而不是處理和舊數據。 (或者使用一個新的group.id,因爲您知道沒有提交的偏移量。)

作爲所有這些的替代方法,您還可以在消費者中「設法結束」每個分區。但是這會讓你的代碼更加複雜,如果你的消費者組沒有發生任何提交,就可以避免它。

+0

爲什麼我們不應該手動提交?如果我們不手動執行,什麼時候會發生。我嘗試設置(auto.commit.enable = false)和uto.offset.reset =最大,並且具有與以前相同的組ID,但它仍然從頭閱讀。 – Priyanka

+0

更新了我的答案。這有意義嗎? –

1

對於卡夫卡0。10(或更早可能),你可以這樣做:

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
consumer = new KafkaConsumer<>(properties); 
consumer.seekToEnd(Collections.emptySet()); 

這將關閉存儲消費者(因爲你不使用它)的經紀人偏移,並尋求所有分區的最新位置。

+1

您可能會補充說,有必要使用尚未提交讀取偏移量的使用者組。 – Schleichardt

+0

確實它不會刪除任何現有的存儲偏移量。但是不會使用seek只是覆蓋這個? 我意識到OP沒有定義「電流偏移」的含義。我的回答假設她希望最新發布。如果她的意思是「最新消費」,那麼需要啓用自動提交,並且每次運行時消費者組名稱都必須相同。 – AutomatedMike

相關問題