2016-12-01 82 views
1

我上運行的火花卡夫卡一個流讀取器火花 - 卡夫卡流異常 - 對象不是serializableConsumerRecord

以下是依賴

<dependencies> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.11</artifactId> 
     <version>2.0.1</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.11</artifactId> 
     <version>2.0.1</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> 
     <version>2.0.1</version> 
    </dependency> 
</dependencies> 

當一些數據說「嗨---- 3」生產卡夫卡的話題,收到以下異常(我可以看到,雖然在異常數據) -

Serialization stack: 
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = q_metrics, partition = 0, offset = 26, CreateTime = 1480588636828, checksum = 3939660770, serialized key size = -1, serialized value size = 9, key = null, value = "Hi----3")) 

我沒有做對任何RDD計算(因爲這也拋出同樣的異常)。即使stream.print()也拋出異常

以下是代碼

import org.apache.spark.streaming._ 
import org.apache.spark.SparkContext 
import org.apache.spark.streaming.kafka010._ 
import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
import org.apache.spark.streaming.kafka010.Subscribe 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 
import org.apache.spark.rdd.RDD 

class Metrics { 

    def readKafka() { 
    val kafkaParams = Map[String, Object](
     "bootstrap.servers" -> "localhost:9092", 
     "key.deserializer" -> classOf[StringDeserializer], 
     "value.deserializer" -> classOf[StringDeserializer], 
     "group.id" -> "use_a_separate_group_id_for_each_stream", 
     "auto.offset.reset" -> "latest", 
     "enable.auto.commit" -> (false: java.lang.Boolean)) 

    val topics = Array("q_metrics") 
    val sc = new SparkContext("local[4]", "ScalaKafkaConsumer") 
    val streamingContext = new StreamingContext(sc, Seconds(10)) 

    val stream = KafkaUtils.createDirectStream[String, String](
          streamingContext, 
          PreferConsistent, 
          Subscribe[String, String](topics, kafkaParams)) 

    stream.print() 


    streamingContext.start 

    streamingContext.awaitTermination() 

    } 

    def rddReader(rdd: Array[String]) = { 

    } 
} 

object MetricsReader { 
    def main(args: Array[String]): Unit = { 
    val objMetrics = new Metrics() 
    objMetrics.readKafka() 
    } 
} 

得到任何幫助。

感謝

+0

您能接收卡夫卡消費控制檯上的消息? – user4342532

+0

沒有。我將該消息看作是該例外的一部分。 – Raaghu

+0

我認爲你需要添加一些罐子到你的kafka/lib位置,如metrics-core和kafka-clients(如果不存在)。 – user4342532

回答

4

發現的問題,我們無法直接打印爲「打印」調用ConsumerRecord。所以我用地圖,讓記錄,收集鍵值,然後打印

stream.foreachRDD { rdd => 
         val collected = rdd.map(record => (record.key(), record.value())).collect() 
         for (c <- collected) { 
          println(c) 
         } 
        }