這是我從java客戶端構建卡夫卡消費者的代碼。卡夫卡消費者不消費
def buildConsumer[Key, Value](
configuration: KafkaConfiguration, commitInterval: Long, groupId: Option[String] = None)(
implicit keyDeserializer: Deserializer[Key], valueDeserializer: Deserializer[Value]
): KafkaJavaConsumer[Key, Value] = {
val settingsMap: Map[String, Object] = Map(
"bootstrap.servers" -> s"${configuration.bootstrapHost}:${configuration.bootstrapPort}",
"group.id" -> groupId.getOrElse(s"${configuration.topic}-${UUID.randomUUID}"),
"enable.auto.commit" -> "true",
"auto.commit.interval.ms" -> commitInterval.toString,
"auto.offset.reset" -> "earliest"
) ++ configuration.additionalOptions.getOrElse(Map.empty[String, Object])
val consumer = new KafkaJavaConsumer[Key, Value](settingsMap.asJava, keyDeserializer, valueDeserializer)
consumer.subscribe(Seq(configuration.topic).asJava)
consumer
}
我的卡夫卡在端口6050上運行,我已經在控制檯上測試了它,從那個特定的端口產生和使用。林想知道我的問題是否與上面的配置有關。我還用EmbeddedKafka
框架測試了上面的代碼,問題似乎與運行實際的kafka服務器有關。
編輯:
我忘了補充一點,我有多個消費者(通過不同group.id
的)來自同一個經紀人消費,不知道這就是問題所在。
您能否在啓用DEBUG模式的情況下粘貼使用者日誌? –