我有一個管道,創造每分鐘五個線程,每個線程執行以下操作:卡夫卡重啓消費者問題時往往
- 使用創建KafkaConsumer ZookeeperConsumerConnector
- 直到有分鐘通過
一旦分鐘到達,線程將被終止並重復該過程。請注意,所有線程都使用相同的groupId。
一段時間後,我30分鐘和幾個小時之間說了,我開始看到這個例外,每個線程和他們停止消費消息:sycedRebalance(kafka.consumer.ZookeeperConsumerConnector)期間
錯誤 卡夫卡。 common.ConsumerRebalanceFailedException:在kafka.consumer.ZookeeperConsumerConnector上重試4次後無法重新平衡$ zKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:633) at kafka.consumer.ZookeeperConsumerConnector $ ZKRebalancerListener $$ anon $ 1.run(ZookeeperConsumerConnector.scala:551)
即使在所有線程被殺害,卡夫卡消費者關閉,並創建新的。
如果我運行這使線程始終活着,它似乎沒有任何問題。
卡夫卡是否應該支持這樣一種情況:給定groupId的消費者會在一段時間後不斷創建和移除?
我在使用卡夫卡0.8.2.1
謝謝!
基本上它應該工作。但你的模式首先是很奇怪的。 –
你可以在server.log中搜索「rebalance期間的異常」或「Rebalancing attempt failed」嗎?需要確定是什麼導致了失敗。 – amethystic