我們將實施卡夫卡發佈訂閱系統。如何處理卡夫卡集羣的故障
現在,在最糟糕的情況下 - 如果某個特定主題的所有卡夫卡經紀商都下跌 - 會發生什麼?
我試過這個......發佈者在默認超時後檢測到元數據提取&如果不成功則拋出異常。
在這種情況下,我們可以在修復Kafka後監視異常並重新啓動Publisher。
但是,消費者呢 - 卡夫卡倒臺後似乎沒有任何例外。我們根本不能要求所有消費者重新啓動他們的系統。有什麼更好的方法來解決這個問題?
我們將實施卡夫卡發佈訂閱系統。如何處理卡夫卡集羣的故障
現在,在最糟糕的情況下 - 如果某個特定主題的所有卡夫卡經紀商都下跌 - 會發生什麼?
我試過這個......發佈者在默認超時後檢測到元數據提取&如果不成功則拋出異常。
在這種情況下,我們可以在修復Kafka後監視異常並重新啓動Publisher。
但是,消費者呢 - 卡夫卡倒臺後似乎沒有任何例外。我們根本不能要求所有消費者重新啓動他們的系統。有什麼更好的方法來解決這個問題?
如果消費者(0.9.x版本的版本)是輪詢和集羣宕機應該得到下面的異常
java.net.ConnectException: Connection refused
你可以繼續投票直至集羣又回來了,沒有必要重啓消費者,它會重新建立連接。
但是,消費者呢 - 一旦卡夫卡宕機,他們似乎沒有任何例外 。我們根本不能要求「全部」消費者重新啓動他們的系統。有什麼更好的方法來解決這個問題?
是的,消費者不會得到任何例外,行爲按設計工作。但是,您不需要重新啓動所有消費者,只需確保您的邏輯消費者定期撥打poll()
方法呼叫。消費者的設計方式不會受到影響,即使沒有羣集存在。請考慮以下步驟來了解實際發生的情況:
1:所有集羣都關閉,沒有活動集羣。
2:consumer.poll(timeout) // This will be called form you portion of code
3:KafkaConsumer.java
內poll()
方法調用,下面的調用將發生的順序。
poll() --> pollOnce() --> ensureCoordinatorKnown() --> awaitMetaDataUpdate()
我突出顯示了在內部執行邏輯檢查後調用的主要方法調用。現在,在這一點上,您的消費者將等待羣集再次啓動。
4:再次羣集或重新啓動
5:消費者將收到通知,並開始爲常是集羣下山之前再次合作。
注意: - 消費者將開始接收來自上次偏移量提交的消息,成功收到的消息不會被複制。
描述的行爲適用於(0.9.x版本)
聽起來像我需要的......這可以在V8.2.1中完成嗎?如果是這樣,如何啓用? – nikel
我認爲既然它是一個新系統,你使用的是最新的消費者,我很抱歉,但我不熟悉舊消費者。 – Nautilus