2016-03-08 217 views
2

我試圖從卡夫卡的話題來讀取(在Java中),但是這個例外總是推出:UnknownCodecException卡夫卡

kafka.common.UnknownCodecException: 3 is an unknown compression codec 
    at kafka.message.CompressionCodec$.getCompressionCodec(CompressionCodec.scala:26) 
    at kafka.message.Message.compressionCodec(Message.scala:213) 
    at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:173) 
    at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:191) 
    at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:145) 
    at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) 
    at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) 
    at scala.collection.Iterator$$anon$1.hasNext(Iterator.scala:847) 
    at scala.collection.Iterator$$anon$19.skip(Iterator.scala:612) 
    at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:615) 
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:210) 
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:99) 
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745)  

對於消費者的屬性是:

group.id=groupTest123 
consumer.id=consumerid 
client.id=clientid 
auto.offset.reset=smallest 

,爲生產商:

acks=1 
buffer.memory=67108864 
compression.type=none 
batch.size=16384 
linger.ms=0 

任何想法?

謝謝!

+1

您正在使用錯誤的編解碼器。你可以發佈你的配置(生產者/消費者)嗎? – Markon

+0

請用屬性更新您的問題(生產者和消費者)。 – Markon

回答

1

壓縮類型3代表LZ4壓縮,顯然不支持您使用的客戶端。請參閱compression types以供參考。如果我沒有記錯的話,LZ4加入了0.8.2之類的東西。

我不確定是否有任何火花集成能夠處理LZ4壓縮Kafka有效載荷,所以我認爲你可以嘗試在製作方使用GZIP或Snappy壓縮,如果可能的話。