2016-09-28 35 views
1

我跟着教程從http://www.confluent.io/blog/kafka-connect-cassandra-sink-the-perfect-match/,我能夠從Avro的控制檯數據插入到卡桑德拉。 現在我想擴大這種使用水槽和我有水槽在我的機器設置,這將挑選日誌文件,並把它推到卡夫卡,想我的數據插入到數據庫卡桑德拉。 在一個文本文件中,我把數據合流和卡桑德拉:獲取DataException:無法將數據反序列化Avro的,未知的魔法字節

{「id」:1,「created」:「2016-05-06 13:53:00」,「product」:「OP-DAX-P-20150201- 95.7「,」price「:94.2}

{」id「:2,」created「:」2016-05-06 13:54:00「,」product「:」OP-DAX-C-20150201- 100「,」price「:99.5}

{」id「:3,」created「:」2016-05-06 13:55:00「,」product「:」FU-DATAMOUNTAINEER-20150201-100「 ,「price」:10000}

{「id」:4,「created」:「2016-05-06 13:56:00」,「product」:「FU-KOSPI-C-20150201-100」 ,「價格」:150}

Flume正在挑選數據並將其推送到kafka。

在卡桑德拉水槽,我面對錯誤時,

ERROR任務卡桑德拉散熱器訂單-0扔一個未捕獲和不可恢復的異常(org.apache.kafka.connect.runtime.WorkerTask:142) 有機.apache.kafka.connect.errors.DataException:無法將數據反序列化到Avro: at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109) at org.apache.kafka.connect.runtime。 WorkerSinkTask.convertMessages(WorkerSinkTask.java:346) 在org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226) 在org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTa sk.java:170) 在org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) 在org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors $ RunnableAdapter.call(Executors.java:511) at java.util.concurrent。 FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 引起:org.apache.kafka.common.errors.SerializationException:錯誤解串FO阿夫羅消息r id -1 引起:org.apache.kafka.common.errors.SerializationException:未知的魔術字節! [2016年9月28日15:47:00951] ERROR任務正在被殺死,並不會恢復,直到手動重新啓動(org.apache.kafka.connect.runtime.WorkerTask:143) [2016年9月28日15時47分:00,951]信息停止卡桑德拉水槽。 (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask:79) [2016年9月28日15:47:00952] INFO關閉卡桑德拉驅動會話和羣集。 (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:165)

模式,我使用

./confluent/bin/kafka-avro-console-producer \--broker-list localhost:9092 \--topic orders-topic \--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"}, {"name":"created", "type": "string"}, {"name":"product", "type": "string"}, {"name":"price", "type": "double"}]}' 

配置的水槽: 水槽,kafka.conf。性能

agent.sources = spoolDirSrc 
agent.channels = memoryChannel 
agent.sinks = kafkaSink 


agent.sources.spoolDirSrc.type = spooldir 
agent.sources.spoolDirSrc.spoolDir = eventlogs 
agent.sources.spoolDirSrc.inputCharset = UTF-8 
agent.sources.spoolDirSrc.deserializer.maxLineLength = 1048576 

agent.sources.spoolDirSrc.channels = memoryChannel 
agent.sinks.kafkaSink.channel = memoryChannel 
agent.channels.memoryChannel.type = memory 

agent.channels.memoryChannel.capacity = 1000 

agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink 
agent.sinks.kafkaSink.topic = orders-topic 
agent.sinks.kafkaSink.brokerList = localhost:9092 
agent.sinks.kafkaSink.channel = memoryChannel 
agent.sinks.kafkaSink.batchSize = 20 

任何人都可以請幫我,如何解決這個問題?

回答

0

通常,如果你有一個未知的魔術字節,這意味着你的卡夫卡客戶端和服務器版本是不兼容的。請檢查以確保您的Cassandra接收器版本是使用小於或等於您的經紀商的版本與Kafka客戶端庫一起構建的。

+0

我檢查過,所有的罐子都是0.10,我仍然收到相同的錯誤 – Chirag

+0

水槽是否知道如何從模式註冊表中獲取模式?對不起,我對這個設置不太熟悉,但看起來你是反序列化問題,可能是因爲序列化程序不知道如何用魔法字節查找模式? – dawsaw

+0

沒有水槽不知道,但我期待,我可以提供架構,而序列化 – Chirag