2017-06-19 34 views
2

工作,如何使無功卡夫卡(https://github.com/akka/reactive-kafka)與合流模式註冊的Avro模式工作?下面是我的示例代碼:使反應卡夫卡與合流模式註冊的Avro模式

def create(groupId: String)(implicit system: ActorSystem): Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] = { 
 
    
 
    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new ByteArrayDeserializer) 
 
     .withBootstrapServers(bootstrap) 
 
     .withGroupId(groupId) 
 
     .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
 
     .withProperty("schema.registry.url", schemaRegistryUrl) 
 
     .withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[String].getName) 
 
     .withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer].getName) 
 
     .withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true") 
 
    
 
    Consumer.committableSource(consumerSettings, Subscriptions.topics(createDataSetJobRequestTopic)) 
 
}

回答

1

你只需要配置消費者使用匯合解串器:io.confluent.kafka.serializers.KafkaAvroDeserializer.class

設置以下屬性:

  • ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG
  • ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG

此外,你必須以至少指定一個地址指向您的架構註冊表實例添加屬性「schema.registry.url」。

最後,您有以下dependecy添加到您的項目:

  <dependency> 
      <groupId>io.confluent</groupId> 
      <artifactId>kafka-avro-serializer</artifactId> 
      <version>${io.confluent.version}</version> 
      <exclusions> 
       <exclusion> 
        <groupId>org.slf4j</groupId> 
        <artifactId>slf4j-log4j12</artifactId> 
       </exclusion> 
      </exclusions> 
     </dependency> 

Confluent Documentation

+0

謝謝,我知道如何消費與架構註冊表卡夫卡Avro的消息時,我問的問題是如何使用它與反應性卡夫卡。它需要先構建consumerSettings,就像我上面粘貼的代碼一樣。我不知道是否有可能得到源[ConsumerMessage.CommittableMessage [字符串,AvroClass]直接因爲KafkaAvroDeseriablizer是不通用的。否則,我得到CommittableMessage [String,Object],然後將其轉換爲特定的? –

+0

嗨,你能弄清楚這一點。我有通過akka流kafka反序列化kafka消息的確切問題。它返回一個我無法投射到我的類型的對象。 – armulator

+0

看來這個問題並沒有被修正,並被其他人報告。 https://github.com/akka/reactive-kafka/issues/342。 –