2017-05-03 87 views
1

我正在運行火花流應用程序中的序列化錯誤。下面是我的驅動程序代碼:火花流串行化錯誤

package com.test 
import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.json.JSONObject; 
import java.io.Serializable 


object SparkFiller extends Serializable{ 
def main(args: Array[String]): Unit ={ 
val sparkConf = new 
SparkConf().setAppName("SparkFiller").setMaster("local[*]") 
// println("test") 
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
sparkConf.registerKryoClasses(Array(classOf[firehoseToDocumentDB])) 
sparkConf.registerKryoClasses(Array(classOf[PushToDocumentDB])) 
var TimeStamp_Start = 1493836050 
val TimeStamp_Final = 1493836056 
var timeStamp_temp = TimeStamp_Start - 5; 
// val send_timestamps = new firehoseToDocumentDB(TimeStamp_Start,TimeStamp_Final); 
// send_timestamps.onStart(); 
val ssc = new StreamingContext(sparkConf, Seconds(5)) 
val lines = ssc.receiverStream(
new firehoseToDocumentDB(TimeStamp_Start.toString(),TimeStamp_Final.toString())) 
// val timestamp_stream = ssc.receiverStream(new firehoseToDocumentDB(TimeStamp_Start.toString(),TimeStamp_Final.toString())) 
lines.foreachRDD(rdd => { 
    rdd.foreachPartition(part => { 
    val dbsender = new PushToDocumentDB(); 
    part.foreach(msg =>{ 
     var jsonobject = new JSONObject(part) 
     var temp_pitr = jsonobject.getString("pitr") 
     println(temp_pitr) 
     if (TimeStamp_Final >= temp_pitr.toLong) { 
     ssc.stop() 
     } 
     dbsender.PushFirehoseMessagesToDocumentDb(msg) 
    }) 
    // dbsender.close() 
    }) 
}) 

println("line",line))) 
println("ankush") 
ssc.start() 
ssc.awaitTermination() 
} 

} 

當我下面的行添加到代碼

 var jsonobject = new JSONObject(part) 
     var temp_pitr = jsonobject.getString("pitr") 
     println(temp_pitr) 
     if (TimeStamp_Final >= temp_pitr.toLong) { 
     ssc.stop() 
     } 

我得到一個錯誤:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919) 
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) 
at com.boeing.SparkFiller$$anonfun$main$1.apply(SparkFiller.scala:26) 
at com.boeing.SparkFiller$$anonfun$main$1.apply(SparkFiller.scala:25) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
at scala.util.Try$.apply(Try.scala:161) 
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext 
Serialization stack: 
- object not serializable (class: 
org.apache.spark.streaming.StreamingContext, value: 
[email protected]) 
- field (class: com.boeing.SparkFiller$$anonfun$main$1, name: ssc$1, type: 
class org.apache.spark.streaming.StreamingContext) 
- object (class com.boeing.SparkFiller$$anonfun$main$1, <function1>) 
- field (class: com.boeing.SparkFiller$$anonfun$main$1$$anonfun$apply$1, 
name: $outer, type: class com.boeing.SparkFiller$$anonfun$main$1) 
- object (class com.boeing.SparkFiller$$anonfun$main$1$$anonfun$apply$1, <function1>) 
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) 
... 30 more 

Process finished with exit code 1 

如果我刪除這些代碼行是運作良好。

問題是因爲在rdd中使用了ssc.stop()。如果滿足條件,是否還有其他方法可以從rdd調用關閉鉤子。

+3

你不能在'foreachRDD'內執行'ssc.stop()',你應該等待完成。 – freedev

+0

爲什麼不在'ssc.awaitTermination()'之後添加'ssc.stop()'? – freedev

+0

@freedev我保留的原因是我想停止火花執行,如果它滿足if條件。 –

回答

0

Issue is because of using the ssc.stop() in the rdd.

你是對的!任何Spark上下文都不可序列化,並且不能在任何任務中使用。

is there any otherway that I can call a shutdown hook from the rdd if it satisfies the condition.

爲了控制您的流媒體應用程序的生命週期,你應該考慮覆蓋的監聽器和停止根據你的病情的背景。我做了足夠的研究,發現這是唯一可行的解​​決方案。

請參閱我對this post的回答,瞭解如何根據特定條件停止流式傳輸應用程序。