2
我試圖從卡夫卡的話題來讀取(在Java中),但是這個例外總是推出:UnknownCodecException卡夫卡
kafka.common.UnknownCodecException: 3 is an unknown compression codec
at kafka.message.CompressionCodec$.getCompressionCodec(CompressionCodec.scala:26)
at kafka.message.Message.compressionCodec(Message.scala:213)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:173)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:191)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:145)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at scala.collection.Iterator$$anon$1.hasNext(Iterator.scala:847)
at scala.collection.Iterator$$anon$19.skip(Iterator.scala:612)
at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:615)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:210)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:99)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
對於消費者的屬性是:
group.id=groupTest123
consumer.id=consumerid
client.id=clientid
auto.offset.reset=smallest
,爲生產商:
acks=1
buffer.memory=67108864
compression.type=none
batch.size=16384
linger.ms=0
任何想法?
謝謝!
您正在使用錯誤的編解碼器。你可以發佈你的配置(生產者/消費者)嗎? – Markon
請用屬性更新您的問題(生產者和消費者)。 – Markon