2017-05-26 67 views
1

我正在學習Spark Streaming。我想保持狀態更新,並能夠使用mapWithState更新狀態。我還在上下文中啓用了檢查點。如果我不得不停止/重新開始這項工作,我想記住這個國家。現在每次重啓都會重新開始計數。我嘗試了各種現金,檢查點選項,並通過大量發佈進行掃描,但沒有獲得清晰的圖像。如何記住Spark Streaming應用程序重新啓動之間的狀態?

環境: 我在開發中本地運行Spark,也作爲HDP沙箱運行。 (我在兩種環境中都嘗試過)。

  1. 是否有可能記住你殺死Spark作業並重新啓動它的狀態。 (沒有任何編程改變)。

  2. 如果可能怎麼辦?任何指針或建議都會有所幫助。 (我嘗試了chekpoints,緩存在單獨的RDD上,MapwithStateRDD在本地和HDP sanbox上)。

  3. 我沒有嘗試的唯一選擇是將MapWithStateRDD保存到磁盤並將其讀回爲initialRDD。無論如何,不​​會覺得這是正確的選擇。

我只發現了一個沒有答案的類似問題。 Spark Checkpoint doesn't remember state (Java HDFS)

謝謝。

代碼:

def getStreamingContext(streamingApp : (SparkContext, Duration) => StreamingContext, sc : SparkContext, batchDuration: Duration) = { 
    val creatingFunc =() => streamingApp(sc, batchDuration) 
    val ssc = sc.getCheckpointDir match { 
     case Some(checkpointDir) => 
     println("Get or Create Context") 
     StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc, sc.hadoopConfiguration, createOnError = true) 
     case None => 
     print("New Context") 
     StreamingContext.getActiveOrCreate(creatingFunc) 
    } 
    sc.getCheckpointDir.foreach(cp => ssc.checkpoint(cp)) 
    println(ssc.getState()) 
    ssc 
    } 

星火版本2.1.0

+0

使用點校驗,但你說你啓用它,所以...你可以顯示代碼初始化StreamingContext並設置檢查點?什麼是Spark版本? –

+0

謝謝你的快速回復Jacek。 – user2022329

+0

btw:我還嘗試了一個課程材料的練習,它具有完整的檢查點代碼,並且表現相同。我想我可能不得不將狀態保存到HDF並在重新啓動時初始化。 – user2022329

回答

1

我得到它的工作...感謝以下Q/A。 [鏈接](Spark streaming not remembering previous state

我是缺少以下行updateStateByKey後

statefulActivity.checkpoint(分鐘(1))

與啓用檢查點目錄一起添加的持續時間記住在重新啓動狀態。

相關問題