0

我想了解一些基本的Kafka概念,以便我可以正確監視基於KafkaStreams的應用程序的進度。Kafka重置工具消費者偏移不重置爲零

專門用於調試目的,我需要能夠讓我的應用程序重新使用整個主題。爲此我使用了reset tool

在執行腳本查看一些輸入主題的卡夫卡管理器後,我看到Consumer Offset已經減少,而Lag已經增加(這是有道理的)。雖然Consumer Offset不會爲零。我試圖解釋這一點,但我還沒有找到關於卡夫卡經理中的Consumer OffsetLogsize是指什麼的具體解釋。

爲了使它適合我所看到的,我假設Logsize是自開始以來放置到主題中的消息總量,但不一定是當前在主題中的消息量。有些人可能因年齡超過保留期而被拋棄。我對嗎?

如果不是,那麼在運行某些輸入主題的重置工具後,我觀察到Consumer Offset等於Logsize(而不是零)並且Lag爲零?

+0

你確定主題中的第一個偏移量爲0? – Natalia

回答

2

我對yahoo-kafka-manager不熟悉,但是,您也可以使用bin/kafka-consumer-groups.sh(Kafka自帶的工具)。有LOG-END-OFFSET意味着你所描述的。從命名的角度來看,如果Logsize與「日誌結束偏移量」相同或者分區中最高和最低偏移量之間的差異,我不清楚。

在執行腳本查看Kafka管理器中的一些輸入主題後,我發現Consumer Offset已經減少並且延遲增加。

這很有道理 - 因爲「滯後」是「日誌結束偏移量」和「承諾偏移量」的差異,所以在重置應用程序之後,滯後量應該增加。 但是,我不確定爲什麼承諾的消費者羣體抵消不是零(你可以非常使用 bin/kafka-consumer-group.sh - 也許 yahoo-kafka-manager報告不同的東西)。

更新:但是,該工具不會將偏移量設置爲零,而是設置爲「開始日誌」。 (該文檔是不正確的。)

另外請注意,您重置您的應用程序,然後重新啓動後auto.offset.reset策略可能在打勾([承諾]偏移 可能是無效的,如果日誌截斷了)。這能解釋你觀察到的行爲嗎?

本博客文章也可能有助於瞭解進一步的細節:https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/

+0

通過將數據寫入一些測試主題我驗證了我對關於Logsize的含義的假設是正確的。但我認爲'auto.offeset.reset'不需要我看到什麼。我認爲重置工具本身會選取最新的可用偏移量,如果零不可用,則不會將偏移量重置爲零。我通過將該值設置爲最新和最早並且沒有任何變化進行檢查。如果重置工具將偏移量設置爲零,那麼'auto.offset.reset'在每種情況下都會有不同的表現。重置工具是否可以這樣更新? – LetsPlayYahtzee

+1

嗯......不確定。順便說一句:文檔不完全正確。輸入主題的工具「seeksToBeginning」(參見https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/StreamsResetter.java#L223)所以'auto.offset。當您重置後啓動Streams應用程序時,不應觸發「重置」(有什麼意義 - 文檔不正確:( - 我將打開一個PR來修復下一版本的文檔) –

相關問題