2016-07-22 34 views
1

代碼啓用檢查點流:java.io.NotSerializableException在星火下面

def main(args: Array[String]) { 
    val sc = new SparkContext 
    val sec = Seconds(3) 
    val ssc = new StreamingContext(sc, sec) 
    ssc.checkpoint("./checkpoint") 
    val rdd = ssc.sparkContext.parallelize(Seq("a","b","c")) 
    val inputDStream = new ConstantInputDStream(ssc, rdd) 

    inputDStream.transform(rdd => { 
     val buf = ListBuffer[String]() 
     buf += "1" 
     buf += "2" 
     buf += "3" 
     val other_rdd = ssc.sparkContext.parallelize(buf) // create a new rdd 
     rdd.union(other_rdd) 
    }).print() 

    ssc.start() 
    ssc.awaitTermination() 
} 

,並拋出異常:

java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable 
org.apache.spark.streaming.StreamingContext 
Serialization stack: 
    - object not serializable (class: org.apache.spark.streaming.StreamingContext, value: [email protected]) 
    - field (class: com.mirrtalk.Test$$anonfun$main$1, name: ssc$1, type: class org.apache.spark.streaming.StreamingContext) 
    - object (class com.mirrtalk.Test$$anonfun$main$1, <function1>) 
    - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, name: cleanedF$2, type: interface scala.Function1) 
    - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, <function2>) 
    - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, name: cleanedF$3, type: interface scala.Function2) 
    - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, <function2>) 
    - field (class: org.apache.spark.streaming.dstream.TransformedDStream, name: transformFunc, type: interface scala.Function2) 

當我刪除代碼ssc.checkpoint( 「./檢查點」),該應用程序可以正常工作,但我需要啓用檢查點。

如何解決這個問題時啓用檢查點

回答

1

您可以將上下文初始化和配置任務外main

object App { 
    val sc = new SparkContext(new SparkConf().setAppName("foo").setMaster("local")) 
    val sec = Seconds(3) 
    val ssc = new StreamingContext(sc, sec) 
    ssc.checkpoint("./checkpoint") // enable checkpoint 

    def main(args: Array[String]) { 
    val rdd = ssc.sparkContext.parallelize(Seq("a", "b", "c")) 
    val inputDStream = new ConstantInputDStream(ssc, rdd) 

    inputDStream.transform(rdd => { 
     val buf = ListBuffer[String]() 
     buf += "1" 
     buf += "2" 
     buf += "3" 
     val other_rdd = ssc.sparkContext.parallelize(buf) 
     rdd.union(other_rdd) // I want to union other RDD 
    }).print() 

    ssc.start() 
    ssc.awaitTermination() 
    } 
} 
+2

不是問題的事實'StreamingContext'不能序列,而且他用它轉化裏面? –

+0

@YuvalItzchakov這是我的第一個想法,但它沒有用於轉換(它只用於流級別),所以它不是直接的問題。看起來這個問題在這裏更加微妙,在檢查點時,.StreamingContext被拖拽。 – zero323

+0

在驅動程序端或工作端調用'transform'嗎? –