2016-11-13 69 views
8
  • 星火2.0.0
  • 阿帕奇卡夫卡0.10.1.0
  • 斯卡拉2.11.8

當我使用spark streaming and kafka integration with kafka broker version 0.10.1.0它失敗,出現以下異常以下Scala代碼:如何解決「java.io.NotSerializableException:org.apache.kafka.clients.consumer.ConsumerRecord」在星火流卡夫卡消費者?

16/11/13 12:55:20 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) 
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord 
Serialization stack: 
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a)) 
    - element of array (index: 0) 
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 

爲什麼?如何解決它?


代碼:

import org.apache.kafka.clients.consumer.ConsumerRecord 
import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark.streaming.kafka010._ 
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 
import org.apache.spark._ 
import org.apache.commons.codec.StringDecoder 
import org.apache.spark.streaming._ 

object KafkaConsumer_spark_test { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("KafkaConsumer_spark_test").setMaster("local[4]") 
    val ssc = new StreamingContext(conf, Seconds(1)) 
    ssc.checkpoint("./checkpoint") 
    val kafkaParams =Map[String, Object](
     "bootstrap.servers" -> "localhost:9092", 
     "key.deserializer" -> classOf[StringDeserializer], 
     "value.deserializer" -> classOf[StringDeserializer], 
     "group.id" -> "example", 
     "auto.offset.reset" -> "latest", 
     "enable.auto.commit" -> (false: java.lang.Boolean) 
    ) 

    val topics = Array("local1") 
    val stream = KafkaUtils.createDirectStream[String, String](
     ssc, 
     PreferConsistent, 
     Subscribe[String, String](topics, kafkaParams) 
    ) 
    stream.map(record => (record.key, record.value)) 
    stream.print() 

    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

例外:

16/11/13 12:55:20 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) 
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord 
Serialization stack: 
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a)) 
    - element of array (index: 0) 
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
16/11/13 12:55:20 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord 
Serialization stack: 
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a)) 
    - element of array (index: 0) 
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11); not retrying 
16/11/13 12:55:20 ERROR JobScheduler: Error running job streaming job 1479012920000 ms.0 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord 
Serialization stack: 
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a)) 
    - element of array (index: 0) 
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) 
    at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:122) 
    at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:50) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:733) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
    at scala.util.Try$.apply(Try.scala:192) 
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord 
Serialization stack: 
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a)) 
    - element of array (index: 0) 
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) 
    at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:122) 
    at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:50) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:733) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
    at scala.util.Try$.apply(Try.scala:192) 
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
+0

@Chenghao呂:有在'ConsumerRecord'進口類型,BTW你爲什麼要導入此班,你在哪裏使用這個? – Shankar

+0

@Shankar「ConsumerRecord」未在此應用中使用,我只是複製從[DEMO](http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html) –

+0

進口@LostInOverflow你的意思'ConsumerRecord(主題= LOCAL1,分區= 0,偏移= 10000,CreateTime = 1479012919187,校驗= 1713832959,系列化密鑰大小= -1,序列化的值的大小= 1鍵= NULL,值=一)'是不可seralizable? –

回答

8

消費者記錄對象從DSTREAM接收。當您嘗試打印它時,它會出現錯誤,因爲該對象不可分段。相反,您應該從ConsumerRecord對象獲取值並將其打印出來。

代替stream.print(),這樣做:

stream.map(record=>(record.value().toString)).print 

這應該解決您的問題。

+0

**感謝**!另外,我在這裏發現了我的錯誤。其實,我想要做的就是'stream.map(記錄=>(record.key,record.value))print' –

1

KafkaUtils.createDirectStream創建作爲org.apache.spark.streaming.dstream.DStream。這不是一個RDD。 Spark Streaming會在運行時臨時創建RDD。要檢索RDD,請使用stream.foreach()獲取RDD,然後使用RDD.foreach獲取RDD中的每個對象。這些將是其中的卡夫卡ConsumerRecords您使用使用value()方法從卡夫卡的主題讀取消息:

stream.foreachRDD { rdd => 
    rdd.foreach { record => 
    val value = record.value() 
    println(map.get(value)) 
    } 
} 
+0

您是不是要找: stream.foreachRDD {RDD => rdd.foreach {紀錄=> VAL值= record.value() VAL鍵= record.key() 的println(密鑰+ 「:」 +值) } } – HansHarhoff