2016-09-28 67 views
1

我開發了一個Spark Streaming應用程序,它檢查一個文件流。我需要停止任何驅動器在我流應用程序exception..my代碼如下:在saprk流媒體應用程序中的異常處理

val fileStream=.. 
    fileStream.checkpoint(Duration(batchIntervalSeconds * 1000 * 5)) 

//initiate the chekpointing 
fileStream.foreachRDD(r=> { 
    try { 
    r.count() 
    } catch { 
    case ex: Exception => { 
     ssc.stop(true, true) 
    } 

    } 
} 
) 

不過,我從上面的代碼

yarn.ApplicationMaster: User class threw exception: 
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable 
org.apache.spark.streaming.StreamingContext 
Serialization stack: 
    - object not serializable (class: org.apache.spark.streaming.StreamingContext, value: [email protected]) 
    - field (class: UnionStream$$anonfun$creatingFunc$3, name: ssc$1, type: class org.apache.spark.streaming.StreamingContext) 
    - object (class UnionStream$$anonfun$creatingFunc$3, <function1>) 
    - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1) 
    - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, <function2>) 
    - writeObject data (class: org.apache.spark.streaming.dstream.DStream) 
    - object (class org.apache.spark.streaming.dstream.ForEachDStream, [email protected]) 
    - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) 
    - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, 

我正在紗線我的代碼異常集羣模式..

回答

0

你試過採取嘗試{}趕上了forEachRDD和包裝調用foreachrdd內嘗試捕捉+ {},這樣的事情

try { 
//initiate the chekpointing 
fileStream.foreachRDD(r=> { 
    r.count() 
    } 
} 
} catch { 
    case ex: Exception => { 
     ssc.stop(true, true) 
    } 
) 

從異常它看起來像spark正在採取foreachRDD塊內的所有代碼,包括需要SparkStreamingContext的異常句柄,並試圖序列化它,以便它可以將它發送到將處理當前RDD上的進程的節點。由於SparkStreamingContext不是可序列化的,因此它爆炸了。

+0

我可以問一個問題嗎? r.count()是否總是在驅動程序節點上運行?我需要從驅動程序中獲取例外。 – mahdi62

+0

不管你在foreach裏面做什麼,RDD都會去處理任務的節點。但是你想要的是在發生錯誤時停止上下文,並且只能從驅動程序訪問上下文。 –

0

要在foreachRDD調用中發生異常時停止Spark應用程序,請勿嘗試捕獲foreachRDD中的異常。而是將ssc.awaitTermination調用包裝在try/catch塊中並從那裏調用ssc.stop:

val ssc = createStreamingContext()   
ssc.start() 
try { 
    ssc.awaitTermination() 
} catch { 
    case e: Exception => 
    ssc.stop(stopSparkContext = true, stopGracefully = true) 
    throw e // to exit with error condition 
}