我使用hbase-spark在我的spark-streaming項目中記錄了pv/uv。然後,當我殺了應用程序,並重新啓動它,我有以下異常,而檢查點恢復:是否有可能從Spark-streaming檢查點恢復廣播值
16/03/02 10點17分21秒ERROR HBaseContext:從廣播 java.lang.ClassCastException無法getConfig: [B不能轉換爲com.paitao.xmlife.contrib.hbase.HBaseContext.getConf(HBaseContext.scala:645)org.apache.spark.SerializableWritable com.paitao.xmlife.contrib.hbase.HBaseContext.com $ paitao $ xmlife $ contrib $ hbase $ HBaseContext $$ hbaseForeachPartition(HBaseContext.scala:627) at com.paitao.xmlife.contrib.hbase.HBaseContext $$ anonfun $ com $ paitao $ xmlife $ contrib $ hbase $ HBaseContext $$ bulkMutation $ 1.apply(HBaseContext.scala:457) at com.paitao.xmlife.contrib.hba se.HBaseContext $$ anonfun $ com $ paitao $ xmlife $ contrib $ hbase $ HBaseContext $$ bulkMutation $ 1.apply(HBaseContext.scala:457) at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1 $$ anonfun $ apply $ 29.apply(RDD.scala:898) at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1 $$ anonfun $ apply $ 29.apply(RDD.scala:898) at org.apache .spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1839) at org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1839) at org.apache.spark .scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor $ TaskRunner.run(Executor .scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecut or.java:1142) 在java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(ThreadPoolExecutor.java:617) 在java.lang.Thread.run(Thread.java:745)
我檢查HBaseContext的代碼,它使用廣播來存儲HBase配置。
class HBaseContext(@transient sc: SparkContext,
@transient config: Configuration,
val tmpHdfsConfgFile: String = null) extends Serializable with Logging {
@transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
@transient var tmpHdfsConfiguration: Configuration = config
@transient var appliedCredentials = false
@transient val job = Job.getInstance(config)
TableMapReduceUtil.initCredentials(job)
// <-- broadcast for HBaseConfiguration here !!!
var broadcastedConf = sc.broadcast(new SerializableWritable(config))
var credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials()))
...
當檢查點恢復,它試圖在其getConf FUNC訪問此廣播值:
if (tmpHdfsConfiguration == null) {
try {
tmpHdfsConfiguration = configBroadcast.value.value
} catch {
case ex: Exception => logError("Unable to getConfig from broadcast", ex)
}
}
然後發生異常。我的問題是:是否有可能從火花應用程序中的檢查點恢復廣播值?我們還有其他一些解決方案重播恢復後的值?
感謝您的任何反饋!
謝謝,它的工作原理,當我初始化singleton對象 –
@伊軒範文及@He白..你能解釋一下你的解決方案。如果我在Map函數中如果沒有訪問Spark上下文,那麼executor如何將數據本身加載到單例對象中? – metsathya