2017-08-01 46 views
0

我正在處理基於輸入數據計算的某些聚合顯示實時儀表板的要求。使用Spark Streaming定期保存計算結果?

我剛剛開始探索Spark/Spark Streaming,我發現我們可以使用Spark Integration以微批實時計算並將其提供給UI儀表板。

我的查詢是,如果在Spark Integration作業啓動後的任何時候,它停止/或崩潰,當它出現時它將如何從它上次處理的位置恢復。我知道Spark維護着一個內部狀態,並且我們會爲每個我們收到的新數據更新狀態。但是,重啓時這種狀態不會消失。

我覺得我們可能需要定期保存運行總數/結果,以便Spark在重新啓動時通過從那裏獲取來恢復處理。但是,不知道我如何使用Spark Streaming來做到這一點。

但是,不確定Spark Streaming默認情況下是否確保數據不會丟失,因爲我剛開始使用它。

如果有人遇到類似的情況,您能否提供一些關於我如何解決這個問題的想法。

+1

我認爲你會在這裏找到一些答案:https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing – maasg

回答

0

Spark Streaming充當消費者應用程序。實時地從Kafka主題中提取數據,您可以將數據的偏移量存儲在某些數據存儲中。如果您正在閱讀Twitter流的數據,則也是如此。你可以按照下面的帖子來存儲偏移量,如果應用程序崩潰或重新啓動。

http://aseigneurin.github.io/2016/05/07/spark-kafka-achieving-zero-data-loss.html

https://www.linkedin.com/pulse/achieving-exactly-once-semantics-kafka-application-ishan-kumar

1

要點:

  1. 使預寫日誌接收機
  2. 使檢查點

詳細

  1. 使WAL:設置spark.streaming.receiver.writeAheadLog.enable true
  2. 使檢查點

檢查站是定期編寫你的應用程序狀態,以可靠的存儲。而當你的應用程序失敗時,它可以從檢查點文件中恢復。 要寫出檢查點,這樣寫:

ssc.checkpoint("checkpoint.path") 

要個檢查點讀:在createContext功能

def main(args: Array[String]): Unit = { 
    val ssc = StreamingContext.getOrCreate("checkpoint_path",() => createContext()) 

    ssc.start() 
    ssc.awaitTermination() 
} 

,您應該創建SSC,做你自己的邏輯。例如:

def createContext(): StreamingContext = { 
    val conf = new SparkConf() 
    .setAppName("app.name") 
    .set("spark.streaming.stopGracefullyOnShutdown", "true") 

    val ssc = new StreamingContext(conf, Seconds("streaming.interval")) 
    ssc.checkpoint("checkpoint.path") 

    // your code here 

    ssc 
} 

以下是關於如何部署火花流媒體應用,包括驅動器/執行器故障中恢復必要的步驟文檔。

https://spark.apache.org/docs/1.6.1/streaming-programming-guide.html#deploying-applications

相關問題