2016-06-08 57 views
4

任何解決此問題的方法?我無法閱讀KAFKA-AVRO Schema消息。 Iam試圖從logstash發送消息到KAFKA到HDFS。無法讀取卡夫卡 - Avro Schema消息

以下是高科技棧:

  1. Logstash 2.3 - 目前生產的版本
  2. 匯合3.0。
  3. 插件: a。 Logstash-kafka-Output插件 b。 Logstash編解碼器,Avro公司。
  4. 飼養員:3.4.6
  5. KAFKA:0.10.0.0

Logstash配置文件看起來是這樣的:

input { 
stdin{} 
} 

filter { 
mutate { 
remove_field => ["@timestamp","@version"] 
    } 
} 

output { 
    kafka { 
topic_id => 'logstash_logs14' 

codec => avro { 
schema_uri => "/opt/logstash/bin/schema.avsc" 
    } 
    } 
} 

的schema.avsc文件看起來是這樣的:

{ 
    "type":"record", 
    "name":"myrecord", 
    "fields":[ 
     {"name":"message","type":"string"}, 
     {"name":"host","type":"string"} 
     ] 
} 

執行以下命令:

  1. 開始動物園管理員在其自身終端

    ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

2啓動卡夫卡在其自身終端

在自己的終端

3啓動模式註冊表

./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties 

4從logstash目錄,運行以下命令

bin/logstash -f ./bin/logstash.conf 

5鍵入您希望上面的命令 前跑後,發送到卡夫卡的日誌信息:「Hello World」的

6從消費主題卡夫卡

./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic logstash_logs14 --from-beginning 
_While consuming we get the following error:_ 

SLF4J: Class path contains multiple SLF4J bindings. 
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/schema-registry/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 
Processed a total of 1 messages 
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:103) 
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte! 
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:103) 
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte! 

請讓我知道如何解決這個問題

臨屋nks, Upendra

回答

1

你是怎麼寫/發佈到Kafka的?您看到SerializationException,因爲數據不是使用模式註冊表(或KafkaAvroSerializer)編寫的,但在使用模式註冊表時,kafka-avro-console-consumer在內部使用需要數據的模式註冊表(或KafkaAvroDeserializer)以某種格式(特別是<magic byte><schemaId><data>)。如果您使用kafka-avro-console-producer編寫avro數據,那麼您不應該得到此異常,或者您可以在生產者屬性中爲&值序列化程序設置KafkaAvroSerializer並設置schema-registry-url。

Properties props = new Properties(); 
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
     io.confluent.kafka.serializers.KafkaAvroSerializer.class); 
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
     io.confluent.kafka.serializers.KafkaAvroSerializer.class); 
props.put("schema.registry.url", "http://localhost:8081");