恢復我現在面臨的問題與火花流在這裏我想使用廣播,mapWithState工作org.apache.spark.util.SerializableConfiguration不能鑄造和檢查點在火花中。MapWithState給java.lang.ClassCastException:在從檢查點
以下是用法:
- 因爲我要通過一些連接對象(不序列化)爲遺囑執行人,我使用org.apache.spark.broadcast.Broadcast
- 既然我們要保持我使用狀態流與mapWithState一些緩存的信息
- 我也用我的流上下文的檢查點
我還需要將廣播連接對象傳遞到mapWithState以從外部源獲取一些數據。
當新創建上下文時,流程正常工作。但是,當我崩潰應用程序,並嘗試從檢查點恢復,我得到一個ClassCastException。
我已經把基於一個example from asyncified.io小的代碼片段重現該問題在github:
- 我的廣播邏輯是yuvalitzchakov.utils.KafkaWriter.scala
- 應用程序的虛擬邏輯是yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast.scala
的假人片斷E碼:
我得到當執行
kafkaWriter.value.someMethodCall()
以下錯誤:
17/08/01 21:20:38 ERROR Executor: Exception in task 2.0 in stage 3.0 (TID 4)
java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to yuvalitzchakov.utils.KafkaWriter
at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$.updateUserSessions$1(SparkStatefulRunnerWithBroadcast.scala:144)
at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$.updateUserEvents(SparkStatefulRunnerWithBroadcast.scala:150)
at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$$anonfun$2.apply(SparkStatefulRunnerWithBroadcast.scala:78)
at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$$anonfun$2.apply(SparkStatefulRunnerWithBroadcast.scala:77)
at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
基本上kafkaWriter是廣播變量和kafkaWriter.value應該回到我們廣播的變量,但它返回SerializableCongiguration未得到澆鑄成所需的對象
預先感謝幫助!
爲什麼你需要'KafkaWriter'裏面'mapWithState'?在更新狀態之前是否可以創建呼叫?可能會在'mapPartitions'裏面運行的東西呢?順便說一句,你的例子似乎有一個複製/粘貼錯誤,因爲一些代碼被複制兩次。 –
感謝您的回覆Yuval.This是虛擬的例子,只是爲了重現這個問題。在我們真正的用例中,我們必須通過對db用jdbc調用來獲取一些數據,我們用它來更新狀態。所以我們必須將廣播傳遞給mapWithState。另外,如果您將SparkStateRunner和SparkStateRunnerWithBroadcast稱爲複製,則前一個沒有將廣播傳遞給mapWithState,而後者則具有。 – Saman
我明白了。你有沒有考慮在調用'mapWithState'之前調用JDBC驅動程序? –