3

恢復我現在面臨的問題與火花流在這裏我想使用廣播mapWithState工作org.apache.spark.util.SerializableConfiguration不能鑄造和檢查點在火花中。MapWithState給java.lang.ClassCastException:在從檢查點

以下是用法:

  • 因爲我要通過一些連接對象(不序列化)爲遺囑執行人,我使用org.apache.spark.broadcast.Broadcast
  • 既然我們要保持我使用狀態流與mapWithState一些緩存的信息
  • 我也用我的流上下文的檢查點

我還需要將廣播連接對象傳遞到mapWithState以從外部源獲取一些數據。

當新創建上下文時,流程正常工作。但是,當我崩潰應用程序,並嘗試從檢查點恢復,我得到一個ClassCastException。

我已經把基於一個example from asyncified.io小的代碼片段重現該問題在github

  • 我的廣播邏輯是yuvalitzchakov.utils.KafkaWriter.scala
  • 應用程序的虛擬邏輯是yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast.scala

的假人片斷E碼:

​​

我得到當執行

kafkaWriter.value.someMethodCall()

以下錯誤:

17/08/01 21:20:38 ERROR Executor: Exception in task 2.0 in stage 3.0 (TID 4) 
java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to yuvalitzchakov.utils.KafkaWriter 
    at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$.updateUserSessions$1(SparkStatefulRunnerWithBroadcast.scala:144) 
    at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$.updateUserEvents(SparkStatefulRunnerWithBroadcast.scala:150) 
    at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$$anonfun$2.apply(SparkStatefulRunnerWithBroadcast.scala:78) 
    at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$$anonfun$2.apply(SparkStatefulRunnerWithBroadcast.scala:77) 
    at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181) 
    at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180) 
    at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57) 
    at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) 
    at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) 
    at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336) 
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996) 
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936) 
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996) 
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700) 
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

基本上kafkaWriter是廣播變量和kafkaWriter.value應該回到我們廣播的變量,但它返回SerializableCongiguration未得到澆鑄成所需的對象

預先感謝幫助!

+0

爲什麼你需要'KafkaWriter'裏面'mapWithState'?在更新狀態之前是否可以創建呼叫?可能會在'mapPartitions'裏面運行的東西呢?順便說一句,你的例子似乎有一個複製/粘貼錯誤,因爲一些代碼被複制兩次。 –

+0

感謝您的回覆Yuval.This是虛擬的例子,只是爲了重現這個問題。在我們真正的用例中,我們必須通過對db用jdbc調用來獲取一些數據,我們用它來更新狀態。所以我們必須將廣播傳遞給mapWithState。另外,如果您將SparkStateRunner和SparkStateRunnerWithBroadcast稱爲複製,則前一個沒有將廣播傳遞給mapWithState,而後者則具有。 – Saman

+0

我明白了。你有沒有考慮在調用'mapWithState'之前調用JDBC驅動程序? –

回答

0

如果我們需要從Spark流中的檢查點目錄中恢復,則廣播變量不能與MapwithState一起使用(一般轉換操作)。它只能輸出操作中在這種情況下被使用,因爲它需要星火背景下懶洋洋地初始化廣播

class JavaWordBlacklist { 

private static volatile Broadcast<List<String>> instance = null; 

public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) { 
if (instance == null) { 
synchronized (JavaWordBlacklist.class) { 
if (instance == null) 

{ List<String> wordBlacklist = Arrays.asList("a", "b", "c"); instance = jsc.broadcast(wordBlacklist); } 

} 
} 
return instance; 
} 
} 

class JavaDroppedWordsCounter { 

private static volatile LongAccumulator instance = null; 

public static LongAccumulator getInstance(JavaSparkContext jsc) { 
if (instance == null) { 
synchronized (JavaDroppedWordsCounter.class) { 
if (instance == null) 

{ instance = jsc.sc().longAccumulator("WordsInBlacklistCounter"); } 

} 
} 
return instance; 
} 
} 

wordCounts.foreachRDD((rdd, time) -> { 
// Get or register the blacklist Broadcast 
Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); 
// Get or register the droppedWordsCounter Accumulator 
LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); 
// Use blacklist to drop words and use droppedWordsCounter to count them 
String counts = rdd.filter(wordCount -> { 
if (blacklist.value().contains(wordCount._1())) 

{ droppedWordsCounter.add(wordCount._2()); return false; } 

else 

{ return true; } 

}).collect().toString(); 
String output = "Counts at time " + time + " " + counts; 
}