0
想了解一個基本問題。這裏是我的代碼:Checkpointing使用不可序列化
def createStreamingContext(sparkCheckpointDir: String,batchDuration: Int) = {
val ssc = new StreamingContext(spark.sparkContext, Seconds(batchDuration))
ssc
}
val ssc = StreamingContext.getOrCreate(sparkCheckpointDir,() => createStreamingContext(sparkCheckpointDir, batchDuration))
val inputDirectStream = EventHubsUtils.createDirectStreams(ssc,namespace,progressDir,Map(name -> eventhubParameters)).map(receivedRecord => new String(receivedRecord.getBody))
inputDirectStream.foreachRDD { (rdd: RDD[String], time: Time) =>
val df = spark.read.json(rdd)
df.show(truncate=false)
}
ssc.start()
ssc.awaitTermination()
上面的代碼工作,我可以看到DF。
的問題是:如果我通過
def createStreamingContext(sparkCheckpointDir: String,batchDuration: Int) = {
val ssc = new StreamingContext(spark.sparkContext, Seconds(batchDuration))
ssc.checkpoint(sparkCheckpointDir)
ssc
}
然後ssc.start()使檢查點失敗,「
DSTREAM檢查點已啓用,但與他們的 功能DStreams是不可序列化「
我在做什麼錯了?我想要在啓用檢查點的情況下處理DF。
星火版本:版本2.0.2.2.5.4.2-7 啓動:火花殼--jars火花流-eventhubs_2.11-2.1.1.jar
感謝您的意見。我已閱讀您建議的鏈接。 EventHubsUtils.createDirectStreams是針對事件中心的Direct Dstream創建。我怎樣才能使它可串行化? –
也許你可以試試'extends Serializable'? –