2016-11-03 75 views
1

被持久化到除了盤我已通過使用地圖與檢測重複的記錄和避免這種records..the功能狀態的函數寫入使用火花流的程序是如波紋管相似:火花流狀態到在存儲器

val trackStateFunc1 = (batchTime: Time, 
         key: String, 
         value: Option[(String,String)], 
         state: State[Long]) => { 
    if (state.isTimingOut()) { 
    None 
    } 
    else if (state.exists()) None 
    else { 
    state.update(1L) 
    Some(value.get) 
    } 
} 

val stateSpec1 = StateSpec.function(trackStateFunc1) 
//.initialState(initialRDD) 
.numPartitions(100) 
.timeout(Minutes(30*24*60)) 

我的記錄數可能很高,我保持暫停大約一個月。因此,記錄和按鍵數量可high..I想知道如果我能在另外的磁盤上保存這些國家的Memory..something像 「RDD.persist(StorageLevel.MEMORY_AND_DISK_SER)」

回答

0

我想知道如果我能在另外的磁盤上保存這些國家的 內存在星火

有狀態流自動獲取序列化到永久存儲,這就是所謂的checkpointing。當您運行有狀態的DStream時,您必須提供一個檢查點目錄,否則該圖將無法在運行時執行。

您可以通過DStream.checkpoint設置檢查點間隔。例如,如果你想將它設置爲每隔30秒:

inputDStream 
.mapWithState(trackStateFunc) 
.checkpoint(Seconds(30)) 
+0

我認爲你的回答是inaccurate..I意味着內部狀態DSTREAM(InternalMapWithStateDStream),它通過保持對火花每批(請參閱此鏈接:HTTPS ://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-streaming/spark-streaming-mapwithstatedstreams.html)..默認情況下,只在內存中存在..我的問題是,如果國家是如此之大它不能被緩存在內存中..如果它引發異常或重新計算? – mahdi62