2
工作,如何使無功卡夫卡(https://github.com/akka/reactive-kafka)與合流模式註冊的Avro模式工作?下面是我的示例代碼:使反應卡夫卡與合流模式註冊的Avro模式
def create(groupId: String)(implicit system: ActorSystem): Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] = {
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(bootstrap)
.withGroupId(groupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty("schema.registry.url", schemaRegistryUrl)
.withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[String].getName)
.withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer].getName)
.withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
Consumer.committableSource(consumerSettings, Subscriptions.topics(createDataSetJobRequestTopic))
}
謝謝,我知道如何消費與架構註冊表卡夫卡Avro的消息時,我問的問題是如何使用它與反應性卡夫卡。它需要先構建consumerSettings,就像我上面粘貼的代碼一樣。我不知道是否有可能得到源[ConsumerMessage.CommittableMessage [字符串,AvroClass]直接因爲KafkaAvroDeseriablizer是不通用的。否則,我得到CommittableMessage [String,Object],然後將其轉換爲特定的? –
嗨,你能弄清楚這一點。我有通過akka流kafka反序列化kafka消息的確切問題。它返回一個我無法投射到我的類型的對象。 – armulator
看來這個問題並沒有被修正,並被其他人報告。 https://github.com/akka/reactive-kafka/issues/342。 –