我開發了一個Spark Streaming應用程序,它檢查一個文件流。我需要停止任何驅動器在我流應用程序exception..my代碼如下:在saprk流媒體應用程序中的異常處理
val fileStream=..
fileStream.checkpoint(Duration(batchIntervalSeconds * 1000 * 5))
//initiate the chekpointing
fileStream.foreachRDD(r=> {
try {
r.count()
} catch {
case ex: Exception => {
ssc.stop(true, true)
}
}
}
)
不過,我從上面的代碼
yarn.ApplicationMaster: User class threw exception:
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
org.apache.spark.streaming.StreamingContext
Serialization stack:
- object not serializable (class: org.apache.spark.streaming.StreamingContext, value: [email protected])
- field (class: UnionStream$$anonfun$creatingFunc$3, name: ssc$1, type: class org.apache.spark.streaming.StreamingContext)
- object (class UnionStream$$anonfun$creatingFunc$3, <function1>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, <function2>)
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.ForEachDStream, [email protected])
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData,
我正在紗線我的代碼異常集羣模式..
我可以問一個問題嗎? r.count()是否總是在驅動程序節點上運行?我需要從驅動程序中獲取例外。 – mahdi62
不管你在foreach裏面做什麼,RDD都會去處理任務的節點。但是你想要的是在發生錯誤時停止上下文,並且只能從驅動程序訪問上下文。 –