我正在學習Spark Streaming。我想保持狀態更新,並能夠使用mapWithState更新狀態。我還在上下文中啓用了檢查點。如果我不得不停止/重新開始這項工作,我想記住這個國家。現在每次重啓都會重新開始計數。我嘗試了各種現金,檢查點選項,並通過大量發佈進行掃描,但沒有獲得清晰的圖像。如何記住Spark Streaming應用程序重新啓動之間的狀態?
環境: 我在開發中本地運行Spark,也作爲HDP沙箱運行。 (我在兩種環境中都嘗試過)。
是否有可能記住你殺死Spark作業並重新啓動它的狀態。 (沒有任何編程改變)。
如果可能怎麼辦?任何指針或建議都會有所幫助。 (我嘗試了chekpoints,緩存在單獨的RDD上,MapwithStateRDD在本地和HDP sanbox上)。
我沒有嘗試的唯一選擇是將MapWithStateRDD保存到磁盤並將其讀回爲initialRDD。無論如何,不會覺得這是正確的選擇。
我只發現了一個沒有答案的類似問題。 Spark Checkpoint doesn't remember state (Java HDFS)
謝謝。
代碼:
def getStreamingContext(streamingApp : (SparkContext, Duration) => StreamingContext, sc : SparkContext, batchDuration: Duration) = {
val creatingFunc =() => streamingApp(sc, batchDuration)
val ssc = sc.getCheckpointDir match {
case Some(checkpointDir) =>
println("Get or Create Context")
StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc, sc.hadoopConfiguration, createOnError = true)
case None =>
print("New Context")
StreamingContext.getActiveOrCreate(creatingFunc)
}
sc.getCheckpointDir.foreach(cp => ssc.checkpoint(cp))
println(ssc.getState())
ssc
}
星火版本2.1.0
使用點校驗,但你說你啓用它,所以...你可以顯示代碼初始化StreamingContext並設置檢查點?什麼是Spark版本? –
謝謝你的快速回復Jacek。 – user2022329
btw:我還嘗試了一個課程材料的練習,它具有完整的檢查點代碼,並且表現相同。我想我可能不得不將狀態保存到HDF並在重新啓動時初始化。 – user2022329