0

我在Spark 2 CDH 5.9中使用Kafka客戶端0.8運行流作業。簡單的目標是將信息保存在Impala中,並通過記錄進行記錄。由於InvalidClassException,Spark Kafka Streaming作業失敗

我無法擺脫這種錯誤的,因爲我不知道從哪裏它來自何處:

16/12/14 08:43:28 ERROR scheduler.JobScheduler: Error running job streaming 
job 1481726608000 ms.0 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 25.0 failed 4 times, most recent failure: Lost task 0.3 in stage 25.0 
(TID 132, datanode1, executor 1): 
java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; 
local class incompatible: stream classdesc serialVersionUID = 1, 
local class serialVersionUID = 2 

直接卡夫卡流用

val streamingContext = new StreamingContext(spark.sparkContext, Seconds(2)) 
val kafkaParams = Map[String, String](
    "bootstrap.servers" -> "datanode1:9092,datanode2:9092,datanode3:9092", 
    "group.id" -> "myconsumergroup", 
    "auto.offset.reset" -> "largest") 
val topics:Set[String] = Set("kafkatest") 
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (streamingContext, kafkaParams, topics) 

簡單地創建和處理通過:

val deviceMap = spark.read.parquet("/user/admin/temp/joinData.parquet").cache() 

directKafkaStream.foreachRDD { rdd => 
    val avgData = spark.read.schema(jsonDatastruct).json(rdd.map(i => i._2)).select("data.*").as[JsonInfo] 

    val deviceEnriched = avgData.join(deviceMap,Seq("COMMON_KEY"),"left") 

    deviceEnriched.show(false) 
    spark.sql("use my_database") 
     deviceEnriched.write.mode("append").saveAsTable("tbl_persisted_kafka_stream") 
} 

streamingContext.start() 
streamingContext.awaitTermination() 

回答

2

簡短回答:消息被序列化的版本commons-lang3 JAR即與您使用Spark的JAR不兼容

龍回答:如果你剛剛說谷歌搜索的錯誤消息,則檢索Apache的共享源代碼,你會發現...

  • this post是挖掘到Java「類不兼容的」序列化的問題,在一般
  • FastDateFormat陳述的源代碼serialVersionUID = 1LV3.1直到但V3.2切換到serialVersionUID = 2L(因爲二進制結構當時改變)

順便說一句,我只是檢查和鼎暉5.9附帶commons-lang3V3.1(對蜂房,黑斑羚,哨兵,蜂房式,Oozie的,Sqoop功能於Oozie的)和V3.3.2(爲Spark -in-Oozie)和V3.4(對於Sqoop),而Spark本身根本不需要它。去搞清楚。
而且由於CDH尚未附帶Spark 2,我猜你或者下載了「beta」包裹或Apache版本 - 並且我檢查了Apache版本(V2.0.2)隨附commons-lang3V3.3.2

我的2美分:只需在您的Spark 2命令行中強制--jars /opt/cloudera/parcels/CDH/jars/commons-lang3-3.1.jar,並查看這是否足以解決您的問題。

編輯  可加2分錢,請確保您的「自定義」 JAR得到優先於任何JAR已經在紗線類路徑,與--conf spark.yarn.user.classpath.first=true

+0

感謝參孫。這解決了這個問題:)順便說一下,Spark 2是本週從Cloudera發佈的GA,並且帶有** V3.3.2 **。正如你所說的:去圖。我的根本問題是我無法弄清楚哪個對象正在被序列化,從哪裏到哪裏,但是按照您指出的方式強制v3.1解決了問題。 –

+0

...一會兒。異常又回來了,不管是包含** V3.1 **還是** V3.3.2 **,這個異常總是相同的,並且在同一個節點中(我在三個節點上運行這個異常)。所以我認爲它可能與Spark有關,但與我的工作無關?任何其他想法? –

+0

停止該節點解決了這個問題,所以我猜這個節點有一個陳舊的配置。有什麼方法可以刷新它?因爲它是一個VM,所以我試圖從零開始創建它 –

相關問題