2016-10-26 78 views
0

我有一個管道,創造每分鐘五個線程,每個線程執行以下操作:卡夫卡重啓消費者問題時往往

  1. 使用創建KafkaConsumer ZookeeperConsumerConnector
  2. 直到有分鐘通過
消費消息

一旦分鐘到達,線程將被終止並重復該過程。請注意,所有線程都使用相同的groupId

一段時間後,我30分鐘和幾個小時之間說了,我開始看到這個例外,每個線程和他們停止消費消息:sycedRebalance(kafka.consumer.ZookeeperConsumerConnector)期間

錯誤 卡夫卡。 common.ConsumerRebalanceFailedException:在kafka.consumer.ZookeeperConsumerConnector上重試4次後無法重新平衡$ zKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:633) at kafka.consumer.ZookeeperConsumerConnector $ ZKRebalancerListener $$ anon $ 1.run(ZookeeperConsumerConnector.scala:551)

即使在所有線程被殺害,卡夫卡消費者關閉,並創建新的。

如果我運行這使線程始終活着,它似乎沒有任何問題。

卡夫卡是否應該支持這樣一種情況:給定groupId的消費者會在一段時間後不斷創建和移除?

我在使用卡夫卡0.8.2.1

謝謝!

+1

基本上它應該工作。但你的模式首先是很奇怪的。 –

+0

你可以在server.log中搜索「rebalance期間的異常」或「Rebalancing attempt failed」嗎?需要確定是什麼導致了失敗。 – amethystic

回答