我在java中實現了一個KafkaConsumer
,目前它永遠不會退出.poll
方法。當我在調試模式下深入瞭解源代碼時,我發現它在AbstractCoordinator.ensureCoordinatorKnown()
的while循環中停滯不前,因爲從未找到協調器。KafkaConsumer永遠不會退出.poll方法 - GroupCoordinatorNotAvailableException
在迴路中從sendGroupMetadataRequest()
返回的未來第一次通過org.apache.kafka.clients.consumer.internals.SendFailedException
失敗,然後隨後通過org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The group coordinator is not available.
繼續失敗。有誰知道爲什麼會發生這種情況?
如果我使用控制檯生產者/消費者,我能夠成功地發送和接收消息,只有當我使用我的KafkaConsumer實現時。另外,消費者確實在我的兩臺服務器上工作,所以我知道這不是消費者的實現。
這裏是我的消費與創建屬性:
Properties props = new Properties();
props.put("bootstrap.servers", "myserver:9000);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
編輯:
主題的消費者開始之前肯定是創建。
編輯2: 我刪除了羣集中的所有代理並重新創建它們,現在我在另一個點上失敗。在AbstractCoordinator.ensureActiveGroup()
試圖重新加入時,從performGroupJoin()
返回的未來與org.apache.kafka.common.errors.NotCoordinatorForGroupException: This is not the correct coordinator for this group.
反覆失敗。仍然不確定發生了什麼事。
編輯3: 我刪除了經紀人,並用不同的ID重建他們現在.poll()
方法返回,它的成功消費消息。我仍然想知道爲什麼它首先失敗,但我可以確保它不會再發生。
哪個版本的卡夫卡?經紀人和消費者的版本相同嗎? Kafka是否真的在myserver上收聽:9000(默認爲9092)?你能通過telnet從消費者機器連接到Kafka嗎? –
我正在使用Kafka 0.9.1。是的,kafka真的在那裏傾聽,我使用同一個經紀人爲我創建的控制檯消費者作爲java。只是使用telnet,我不知道它是否在服務器上運行。當我已經知道我可以從控制檯用戶那裏獲得消費時,連接到它的證明會證明什麼? – annedroiid