2015-05-07 160 views
4

當執行人我流火花紗上的應用程序,我不斷收到以下錯誤「java.io.NotSerializableException:org.apache.spark.streaming.StreamingContext」當執行火花流

爲什麼發生錯誤,以及如何解決它?任何建議將幫助,謝謝〜

15/05/07 11:11:50 INFO dstream.StateDStream: Marking RDD 2364 for time 1430968310000 ms for checkpointing 
    15/05/07 11:11:50 INFO scheduler.JobScheduler: Added jobs for time 1430968310000 ms 
    15/05/07 11:11:50 INFO scheduler.JobGenerator: Checkpointing graph for time 1430968310000 ms 
    15/05/07 11:11:50 INFO streaming.DStreamGraph: Updating checkpoint data for time 1430968310000 ms 
    15/05/07 11:11:50 INFO streaming.DStreamGraph: Updated checkpoint data for time 1430968310000 ms 
    15/05/07 11:11:50 ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContext 
    java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext 
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 

火花流應用代碼如下,我在執行它的火花殼

import kafka.cluster.Cluster 
import kafka.serializer.StringDecoder 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming.kafka.KafkaUtils 
import org.apache.spark.streaming.{Duration, StreamingContext} 
import org.apache.spark.rdd.RDD 
import org.apache.spark.streaming.StreamingContext._ 

val updateFunc = (values: Seq[Int], state: Option[Int]) => { 
    Some(0) 
} 

val ssc = new StreamingContext(sc, 
    new Duration(5000)) 
ssc.checkpoint(".") 

val lines = KafkaUtils.createStream(ssc, "10.1.10.21:2181", "kafka_spark_streaming", Map("hello_test" -> 3)) 

val uuidDstream = lines.transform(rdd => rdd.map(_._2)).map(x => (x, 1)).updateStateByKey[Int](updateFunc) 
uuidDstream.count().print() 

ssc.start() 
ssc.awaitTermination() 
+1

我們不介意的讀者。你可以發佈代碼嗎? –

回答

6

內的updateStateByKey關閉是拉動用於val updateFunc參考將該實例的其餘部分放入關閉中並使用StreamingContext。

兩個選項:

  • 快速修復:聲明流上下文瞬態=>@transient val ssc= ... 也是一個不錯的主意,註釋DSTREAM聲明爲@transient爲好。
  • 一個更好的解決辦法:把你的功能在一個單獨的對象

像這樣:

case object TransformFunctions { 
    val updateFunc = ??? 
}