2016-03-02 79 views
0

我使用hbase-spark在我的spark-streaming項目中記錄了pv/uv。然後,當我殺了應用程序,並重新啓動它,我有以下異常,而檢查點恢復:是否有可能從Spark-streaming檢查點恢復廣播值

16/03/02 10點17分21秒ERROR HBaseContext:從廣播 java.lang.ClassCastException無法getConfig: [B不能轉換爲com.paitao.xmlife.contrib.hbase.HBaseContext.getConf(HBaseContext.scala:645)org.apache.spark.SerializableWritable com.paitao.xmlife.contrib.hbase.HBaseContext.com $ paitao $ xmlife $ contrib $ hbase $ HBaseContext $$ hbaseForeachPartition(HBaseContext.scala:627) at com.paitao.xmlife.contrib.hbase.HBaseContext $$ anonfun $ com $ paitao $ xmlife $ contrib $ hbase $ HBaseContext $$ bulkMutation $ 1.apply(HBaseContext.scala:457) at com.paitao.xmlife.contrib.hba se.HBaseContext $$ anonfun $ com $ paitao $ xmlife $ contrib $ hbase $ HBaseContext $$ bulkMutation $ 1.apply(HBaseContext.scala:457) at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1 $$ anonfun $ apply $ 29.apply(RDD.scala:898) at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1 $$ anonfun $ apply $ 29.apply(RDD.scala:898) at org.apache .spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1839) at org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1839) at org.apache.spark .scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor $ TaskRunner.run(Executor .scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecut or.java:1142) 在java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(ThreadPoolExecutor.java:617) 在java.lang.Thread.run(Thread.java:745)

我檢查HBaseContext的代碼,它使用廣播來存儲HBase配置。

class HBaseContext(@transient sc: SparkContext, 
       @transient config: Configuration, 
       val tmpHdfsConfgFile: String = null) extends Serializable with Logging { 

    @transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials() 
    @transient var tmpHdfsConfiguration: Configuration = config 
    @transient var appliedCredentials = false 
    @transient val job = Job.getInstance(config) 

    TableMapReduceUtil.initCredentials(job) 
    // <-- broadcast for HBaseConfiguration here !!! 
    var broadcastedConf = sc.broadcast(new SerializableWritable(config)) 
    var credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials())) 
    ... 

當檢查點恢復,它試圖在其getConf FUNC訪問此廣播值:

if (tmpHdfsConfiguration == null) { 
    try { 
    tmpHdfsConfiguration = configBroadcast.value.value 
    } catch { 
    case ex: Exception => logError("Unable to getConfig from broadcast", ex) 
    } 
} 

然後發生異常。我的問題是:是否有可能從火花應用程序中的檢查點恢復廣播值?我們還有其他一些解決方案重播恢復後的值?

感謝您的任何反饋!

回答

1

目前,這是Spark的一個已知的bug。投稿人一直在調查這個問題,但沒有取得任何進展。

這是我的解決方法:我不是將數據加載到廣播變量並向所有執行者廣播,而是讓每個執行者將數據本身加載到單例對象中。

順便說一句,跟着這個問題更改https://issues.apache.org/jira/browse/SPARK-5206

+0

謝謝,它的工作原理,當我初始化singleton對象 –

+0

@伊軒範文及@He白..你能解釋一下你的解決方案。如果我在Map函數中如果沒有訪問Spark上下文,那麼executor如何將數據本身加載到單例對象中? – metsathya

相關問題