2017-08-31 17 views
0

想了解一個基本問題。這裏是我的代碼:Checkpointing使用不可序列化

def createStreamingContext(sparkCheckpointDir: String,batchDuration: Int) = { 

val ssc = new StreamingContext(spark.sparkContext, Seconds(batchDuration)) 

ssc 
} 

val ssc = StreamingContext.getOrCreate(sparkCheckpointDir,() => createStreamingContext(sparkCheckpointDir, batchDuration)) 


val inputDirectStream = EventHubsUtils.createDirectStreams(ssc,namespace,progressDir,Map(name -> eventhubParameters)).map(receivedRecord => new String(receivedRecord.getBody)) 


inputDirectStream.foreachRDD { (rdd: RDD[String], time: Time) => 
    val df = spark.read.json(rdd) 
    df.show(truncate=false) 

} 

ssc.start() 
ssc.awaitTermination() 

上面的代碼工作,我可以看到DF。

的問題是:如果我通過

def createStreamingContext(sparkCheckpointDir: String,batchDuration: Int) = { 

val ssc = new StreamingContext(spark.sparkContext, Seconds(batchDuration)) 
ssc.checkpoint(sparkCheckpointDir) 
ssc 
} 

然後ssc.start()使檢查點失敗,「

DSTREAM檢查點已啓用,但與他們的 功能DStreams是不可序列化「

我在做什麼錯了?我想要在啓用檢查點的情況下處理DF。

星火版本:版本2.0.2.2.5.4.2-7 啓動:火花殼--jars火花流-eventhubs_2.11-2.1.1.jar

回答

0

我認爲Why is my Spark Streaming application throwing a NotSerializableException when I enable checkpointing?會解決你的問題:

如果啓用星火流檢查點,然後在對象中名爲foreachRDD功能使用應該是序列化

解決方案:

  • 通過刪除jssc.checkpoint行來關閉檢查點。
  • 使對象被使用的可序列化。
  • 聲明NotSerializable的forEachRDD功能裏面,所以下面的代碼示例將被罰款:

在你的代碼,什麼是EventHubsUtils.createDirectStreams()在做什麼?也許你可以讓它可序列化。

+0

感謝您的意見。我已閱讀您建議的鏈接。 EventHubsUtils.createDirectStreams是針對事件中心的Direct Dstream創建。我怎樣才能使它可串行化? –

+0

也許你可以試試'extends Serializable'? –

相關問題