2016-08-19 41 views
0

我寫了一個Spark Streaming應用程序,需要對具有底層轉換的各種Dstream進行一些操作,正如本主題中所建議的那樣(Error in starting Spark streaming context),我已經完成了我所有的transfomration創造上下文的定義的函數,檢查指向我的火花流上下文中的幾個文件流

object StreamingEngine2 { 

    val filterF = { (x: Path) => true } 

    // === Configuration to control the flow of the application === 
    val stopActiveContext = true 

    val batchIntervalSeconds = 10 
    val eventsPerSecond = 10 // For the dummy source 



    var newContextCreated = false // Flag to detect whether new context was created or not 

    // Function to create a new StreamingContext and set it up 
    val creatingFunc = {() => 

    //add winutil to avoid: ERROR org.apache.spark.streaming.scheduler.JobScheduler - Error generating jobs for time 1471237080000 ms java.lang.NullPointerException 
    System.setProperty("hadoop.home.dir", "C:\\hadoop") 
    println("Creating function called to create new StreamingContext") 

    val conf = new SparkConf().setMaster("local[10]").setAppName("FileStreaming").set("spark.streaming.fileStream.minRememberDuration", "2000000h") /*.set("SPARK_CONF_DIR","src/main/resources")*/.registerKryoClasses(Array(classOf[org.apache.hadoop.io.LongWritable], 
     classOf[org.apache.hadoop.io.Text], classOf[GGSN])) 


    // Verify that the attached Spark cluster is 1.4.0+ 
    val sc = new SparkContext(conf) 
    require(sc.version.replace(".", "").substring(0, 3).toInt >= 160, "Spark 1.6.0+ is required to run this code. Please attach it to a Spark 1.6.0+ cluster.") 



    val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds)) 
    ssc.checkpoint("c:\\checkpoints") 


    val ggsnFileLines = ssc.fileStream[LongWritable, Text, TextInputFormat]("C:\\Users\\Mbazarganigilani\\Documents\\RA\\GGSN\\Files", filterF, false) 
    val ccnFIleLines = ssc.fileStream[LongWritable, Text, TextInputFormat]("C:\\Users\\Mbazarganigilani\\Documents\\RA\\CCN\\Files1", filterF, false) 



    //some mapping and transfomration 

    probeFileLines.checkpoint(Duration(batchIntervalSeconds*1000*5)) 
    ggsnFileLines.checkpoint(Duration(batchIntervalSeconds*1000*5)) 


    //check GGSSN... 
    probeFileLines.foreachRDD(s=> 
    { 

     println(s.count()) 


    } 



    ) 


    ssc.remember(Minutes(1)) // To make sure data is not deleted by the time we query it interactively 



    newContextCreated = true 
    ssc 
    } 

    def main(args: Array[String]): Unit = { 



    System.setProperty("hadoop.home.dir", "C:\\hadoop") 

    if (stopActiveContext) { 
     StreamingContext.getActive.foreach { 
     _.stop(stopSparkContext = false) 
     } 
    } 

    val ssc=StreamingContext.getOrCreate("c:\\checkpoints", creatingFunc) 


    if (newContextCreated) { 
     println("New context created from currently defined creating function") 
    } else { 
     println("Existing context running or recovered from checkpoint, may not be running currently defined creating function") 
    } 

    // Start the streaming context in the background. 
    ssc.start() 

    // This is to ensure that we wait for some time before the background streaming job starts. This will put this cell on hold for 5 times the batchIntervalSeconds. 
    ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 2 * 1000*1000) 


    } 


} 

不過,我還是得到的異常,而我的情況下,從檢查點目錄加載

16/08/19 22:28:01 ERROR Utils: Exception encountered 
java.lang.NullPointerException 
    at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply$mcV$sp(DStreamCheckpointData.scala:126) 
    at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:124) 
    at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:124) 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253) 
    at org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DStreamCheckpointData.scala:124) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:516) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:511) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:511) 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253) 
    at org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:511) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:182) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:177) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:177) 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253) 
    at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:177) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
    at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:143) 
    at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:143) 
    at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:143) 
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287) 
    at org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:144) 
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:525) 
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573) 
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) 
    at StreamingEngine2$.main(StreamEngine2.scala:694) 
    at StreamingEngine2.main(StreamEngine2.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) 

誰能幫我解決這個問題,我的代碼,我已經在Spark 1.6和2.0中嘗試了我的代碼,但得到相同的異常。

回答

1

好的,問題是由於幾個流,我應該打電話給他們每次..仍然不知道確切的位置,我應該打電話計數,我應該把每個流的檢查點..