2016-06-29 250 views
1

這是我從java客戶端構建卡夫卡消費者的代碼。卡夫卡消費者不消費

def buildConsumer[Key, Value](
    configuration: KafkaConfiguration, commitInterval: Long, groupId: Option[String] = None)(
    implicit keyDeserializer: Deserializer[Key], valueDeserializer: Deserializer[Value] 
): KafkaJavaConsumer[Key, Value] = { 
    val settingsMap: Map[String, Object] = Map(
     "bootstrap.servers" -> s"${configuration.bootstrapHost}:${configuration.bootstrapPort}", 
     "group.id" -> groupId.getOrElse(s"${configuration.topic}-${UUID.randomUUID}"), 
     "enable.auto.commit" -> "true", 
     "auto.commit.interval.ms" -> commitInterval.toString, 
     "auto.offset.reset" -> "earliest" 
    ) ++ configuration.additionalOptions.getOrElse(Map.empty[String, Object]) 
    val consumer = new KafkaJavaConsumer[Key, Value](settingsMap.asJava, keyDeserializer, valueDeserializer) 
    consumer.subscribe(Seq(configuration.topic).asJava) 
    consumer 
    } 

我的卡夫卡在端口6050上運行,我已經在控制檯上測試了它,從那個特定的端口產生和使用。林想知道我的問題是否與上面的配置有關。我還用EmbeddedKafka框架測試了上面的代碼,問題似乎與運行實際的kafka服務器有關。

編輯:

我忘了補充一點,我有多個消費者(通過不同group.id的)來自同一個經紀人消費,不知道這就是問題所在。

+0

您能否在啓用DEBUG模式的情況下粘貼使用者日誌? –

回答

1

確保,

號分區的消費情況的專題> =號 組

否則,一些消費情況在集團榮獲」分配任何分區。

要檢查分區的數量,使用kafka-topics.sh命令

> sh kafka-topics.sh --zookeeper localhost:2181 --topic test --describe Topic:test PartitionCount:6 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 2 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 3 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 4 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 5 Leader: 0 Replicas: 0 Isr: 0

0

林仍不能確定爲是什麼問題,但通過刪除飼養員數據文件夾和所有的卡夫卡原木,消費者/生產者開始按預期工作。我認爲這可能與我刪除日誌文件以清除主題而不使用用於主題刪除的正式kafka管理工具的問題有關。