2016-11-09 25 views
1

我在java中實現了一個KafkaConsumer,目前它永遠不會退出.poll方法。當我在調試模式下深入瞭解源代碼時,我發現它在AbstractCoordinator.ensureCoordinatorKnown()的while循環中停滯不前,因爲從未找到協調器。KafkaConsumer永遠不會退出.poll方法 - GroupCoordinatorNotAvailableException

在迴路中從sendGroupMetadataRequest()返回的未來第一次通過org.apache.kafka.clients.consumer.internals.SendFailedException失敗,然後隨後通過org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The group coordinator is not available.繼續失敗。有誰知道爲什麼會發生這種情況?

如果我使用控制檯生產者/消費者,我能夠成功地發送和接收消息,只有當我使用我的KafkaConsumer實現時。另外,消費者確實在我的兩臺服務器上工作,所以我知道這不是消費者的實現。

這裏是我的消費與創建屬性:

Properties props = new Properties(); 
props.put("bootstrap.servers", "myserver:9000); 
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
props.put("group.id", groupId); 
props.put("enable.auto.commit", "true"); 
props.put("auto.commit.interval.ms", "1000"); 
props.put("session.timeout.ms", "30000"); 

編輯:

主題的消費者開始之前肯定是創建。

編輯2: 我刪除了羣集中的所有代理並重新創建它們,現在我在另一個點上失敗。在AbstractCoordinator.ensureActiveGroup()試圖重新加入時,從performGroupJoin()返回的未來與org.apache.kafka.common.errors.NotCoordinatorForGroupException: This is not the correct coordinator for this group.反覆失敗。仍然不確定發生了什麼事。

編輯3: 我刪除了經紀人,並用不同的ID重建他們現在.poll()方法返回,它的成功消費消息。我仍然想知道爲什麼它首先失敗,但我可以確保它不會再發生。

+0

哪個版本的卡夫卡?經紀人和消費者的版本相同嗎? Kafka是否真的在myserver上收聽:9000(默認爲9092)?你能通過telnet從消費者機器連接到Kafka嗎? –

+0

我正在使用Kafka 0.9.1。是的,kafka真的在那裏傾聽,我使用同一個經紀人爲我創建的控制檯消費者作爲java。只是使用telnet,我不知道它是否在服務器上運行。當我已經知道我可以從控制檯用戶那裏獲得消費時,連接到它的證明會證明什麼? – annedroiid

回答

0

刪除代理並創建新代理解決了問題。儘管如此,經紀人仍然不確定是否出了問題。

相關問題