我使用Kafka-Cassandra-Sink將數據從Kafka推送到Cassandra。 Kafka以JSON格式從logstash獲取所有時間數據,並且我想通過連接器將相同的數據推送到Cassandra。Kafka連接Cassandra連接器
我是由本教程領導的:http://docs.datamountaineer.com/en/latest/cassandra-sink.html,我在模式註冊表中從avro更改爲JSON格式。之後我做了新的實例我有這種哪些,我可以看到一個錯誤是常見的:
[2017-02-10 09:01:49,978] ERROR Task cassandra-sink-orders-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:390)
java.lang.RuntimeException: java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
at com.datamountaineer.streamreactor.connect.errors.ThrowErrorPolicy.handle(ErrorPolicy.scala:58)
at com.datamountaineer.streamreactor.connect.errors.ErrorHandler$class.handleError(ErrorHandler.scala:67)
at com.datamountaineer.streamreactor.connect.errors.ErrorHandler$class.handleTry(ErrorHandler.scala:48)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.handleTry(CassandraJsonWriter.scala:40)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.insert(CassandraJsonWriter.scala:144)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.write(CassandraJsonWriter.scala:104)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask$$anonfun$put$2.apply(CassandraSinkTask.scala:72)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask$$anonfun$put$2.apply(CassandraSinkTask.scala:72)
at scala.Option.foreach(Option.scala:257)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask.put(CassandraSinkTask.scala:72)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:370)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
at com.datamountaineer.streamreactor.connect.schemas.ConverterUtil$class.convert(ConverterUtil.scala:59)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.convert(CassandraJsonWriter.scala:40)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.com$datamountaineer$streamreactor$connect$cassandra$sink$CassandraJsonWriter$$toJson(CassandraJsonWriter.scala:154)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3$$anonfun$apply$1.apply$mcV$sp(CassandraJsonWriter.scala:127)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3$$anonfun$apply$1.apply(CassandraJsonWriter.scala:125)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3$$anonfun$apply$1.apply(CassandraJsonWriter.scala:125)
at com.datamountaineer.streamreactor.connect.concurrent.ExecutorExtension$RunnableWrapper$$anon$1.run(ExecutorExtension.scala:30)
... 3 more
[2017-02-10 09:01:49,981] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:391)
我試過其他的方式,通過logstash發送Avro的卡夫卡但我不知道該如何處理與像@timestamp和版本屬性或。換句話說,我不知道如何創建Avro的模式,這將成功地解析這兩個字段,因爲我讓周圍的架構屬性unrecognizing @錯誤。
有人可以給我建議或另一個接收器。它不一定要融合,我只是需要接收器才能從卡夫卡獲取這些數據給卡桑德拉。謝謝研究員!
卡夫卡主題的來源是什麼?來自Connect還是來自其他地方?是Avro序列化還是JSON?它在Kafka中?你可以張貼你的水槽連接器配置嗎。 –