我在Spark 2 CDH 5.9中使用Kafka客戶端0.8運行流作業。簡單的目標是將信息保存在Impala中,並通過記錄進行記錄。由於InvalidClassException,Spark Kafka Streaming作業失敗
我無法擺脫這種錯誤的,因爲我不知道從哪裏它來自何處:
16/12/14 08:43:28 ERROR scheduler.JobScheduler: Error running job streaming
job 1481726608000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 25.0 failed 4 times, most recent failure: Lost task 0.3 in stage 25.0
(TID 132, datanode1, executor 1):
java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat;
local class incompatible: stream classdesc serialVersionUID = 1,
local class serialVersionUID = 2
直接卡夫卡流用
val streamingContext = new StreamingContext(spark.sparkContext, Seconds(2))
val kafkaParams = Map[String, String](
"bootstrap.servers" -> "datanode1:9092,datanode2:9092,datanode3:9092",
"group.id" -> "myconsumergroup",
"auto.offset.reset" -> "largest")
val topics:Set[String] = Set("kafkatest")
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (streamingContext, kafkaParams, topics)
簡單地創建和處理通過:
val deviceMap = spark.read.parquet("/user/admin/temp/joinData.parquet").cache()
directKafkaStream.foreachRDD { rdd =>
val avgData = spark.read.schema(jsonDatastruct).json(rdd.map(i => i._2)).select("data.*").as[JsonInfo]
val deviceEnriched = avgData.join(deviceMap,Seq("COMMON_KEY"),"left")
deviceEnriched.show(false)
spark.sql("use my_database")
deviceEnriched.write.mode("append").saveAsTable("tbl_persisted_kafka_stream")
}
streamingContext.start()
streamingContext.awaitTermination()
感謝參孫。這解決了這個問題:)順便說一下,Spark 2是本週從Cloudera發佈的GA,並且帶有** V3.3.2 **。正如你所說的:去圖。我的根本問題是我無法弄清楚哪個對象正在被序列化,從哪裏到哪裏,但是按照您指出的方式強制v3.1解決了問題。 –
...一會兒。異常又回來了,不管是包含** V3.1 **還是** V3.3.2 **,這個異常總是相同的,並且在同一個節點中(我在三個節點上運行這個異常)。所以我認爲它可能與Spark有關,但與我的工作無關?任何其他想法? –
停止該節點解決了這個問題,所以我猜這個節點有一個陳舊的配置。有什麼方法可以刷新它?因爲它是一個VM,所以我試圖從零開始創建它 –