任何解決此問題的方法?我無法閱讀KAFKA-AVRO Schema消息。 Iam試圖從logstash發送消息到KAFKA到HDFS。無法讀取卡夫卡 - Avro Schema消息
以下是高科技棧:
- Logstash 2.3 - 目前生產的版本
- 匯合3.0。
- 插件: a。 Logstash-kafka-Output插件 b。 Logstash編解碼器,Avro公司。
- 飼養員:3.4.6
- KAFKA:0.10.0.0
Logstash配置文件看起來是這樣的:
input {
stdin{}
}
filter {
mutate {
remove_field => ["@timestamp","@version"]
}
}
output {
kafka {
topic_id => 'logstash_logs14'
codec => avro {
schema_uri => "/opt/logstash/bin/schema.avsc"
}
}
}
的schema.avsc文件看起來是這樣的:
{
"type":"record",
"name":"myrecord",
"fields":[
{"name":"message","type":"string"},
{"name":"host","type":"string"}
]
}
執行以下命令:
個開始動物園管理員在其自身終端
./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
2啓動卡夫卡在其自身終端
在自己的終端3啓動模式註冊表
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
4從logstash目錄,運行以下命令
bin/logstash -f ./bin/logstash.conf
5鍵入您希望上面的命令 前跑後,發送到卡夫卡的日誌信息:「Hello World」的
6從消費主題卡夫卡
./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic logstash_logs14 --from-beginning
_While consuming we get the following error:_
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/schema-registry/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Processed a total of 1 messages
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
請讓我知道如何解決這個問題
臨屋nks, Upendra