2017-08-25 95 views
0

我有一個傳統的基於C++的系統,它會吐出支持融合的Avro模式註冊表格式的二進制編碼的Avro數據。在我的Java應用程序中,我使用KafkaAvroDeserializer類成功反序列化消息,但無法打印出消息。無法打印Kafka Avro解碼的消息

private void consumeAvroData(){ 
    String group = "group1"; 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "http://1.2.3.4:9092"); 
    props.put("group.id", group); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer", LongDeserializer.class.getName()); 
    props.put("value.deserializer", KafkaAvroDeserializer.class.getName()); 
    // props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,"false"); 
    props.put("schema.registry.url","http://1.2.3.4:8081"); 
    KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props); 

    consumer.subscribe(Arrays.asList(TOPIC_NAME)); 
    System.out.println("Subscribed to topic " + TOPIC_NAME); 

    while (true) { 
     ConsumerRecords<String, GenericRecord> records = consumer.poll(100); 
     for (ConsumerRecord<String, GenericRecord> record : records) 
     { 
      System.out.printf("value = %s\n",record.value()); 
     } 
    } 
} 

我得到的輸出是

{"value":"�"} 

,這是爲什麼我不能打印反序列化的數據?任何幫助感謝!

回答

2

用於匯合阿夫羅串行導線格式記錄在這裏在後跟一個4字節模式ID爲(目前始終爲0),題爲「有線格式」

http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html

這是一個單魔字節的部由Schema Registry返回,然後是一組字節,這些字節是Avro二進制編碼中的Avro序列化數據。

如果您將該消息讀取爲ByteArray並打印出前5個字節,您將知道這是否爲Confluent Avro序列化消息。應該是0,然後是0001或其他一些Schema ID,您可以檢查它是否位於Schema Registry中用於此主題。

如果不是這種格式,那麼消息可能以另一種方式序列化(沒有Confluent模式註冊表),並且您需要使用不同的解串器,或者可能從消息值中提取完整模式,甚至需要獲取原始模式來自其他來源的文件能夠解碼。

+0

感謝您的回覆!我嘗試手動解析字節數組(不使用Confluent解串器),我可以打印魔術字節Schema ID,但出於某種原因,我無法打印數據。 – KarthikJ

+0

該數據是以二進制格式。你不能打印出來。模式ID是否與模式註冊表中爲此主題配置的內容匹配? –

+0

是的。在替代方法中,我使用avsc文件來解碼傳入的分析數據,然後我試着打印出GenericRecord。我可以看到第一個字節爲神奇字節,2,3,4,5字節作爲模式ID,其餘的(6直到數組-1),我把它當作數據並使用Avro bytearray解串器來查看數據 – KarthikJ