2016-11-27 93 views
2

我正在使用火花mapwithstate,但存儲空間不斷增加。spark-mapwithstate爲什麼存儲空間的數量仍然是20?

enter image description here

問題1:

內存9GB

MapPartitionsRDD大小x 20則可以減少這種規模?

問題2:

And in InternalMapWithStateDStream, storagelevel is fixed to MEMORY_ONLY. 

我想改變堅持(StorageLevel.MEMORY_ONLY_SER),因爲它的規模。可能嗎?

問題3:

Private [streaming] object InternalMapWithStateDStream { 
    Private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10 
} 

我想減少這種檢查點值。可能嗎?

回答

2

MapPartitionsRDD內存大小9GB x 20您能縮小這一尺寸嗎?

它是9GB x 20,但它是如何分佈在您的羣集中的?您需要單擊RDD名稱以查看分佈式狀態。如果你想減少內存大小,你需要考慮一種有效的方式來表示你的數據。

我想更改爲保留(StorageLevel.MEMORY_ONLY_SER),因爲其大小爲 。可能嗎?

不,您不能覆蓋MapWithStateDStream的內存設置。

我想減少此檢查點值。可能嗎?

是的,你可以設置DStream小號檢查時間間隔:

dStream.mapWithState(spec).checkpoint(Seconds(4)) 
+0

謝謝你的回答。 ** 1。該集羣分佈良好**但您只需要最後一個狀態存儲,爲什麼還有剩餘的19個存儲? ** dStream.mapWithState(spec).checkpoint(秒(4))**不工作默認情況下運行10 interva'val kafkastateStream = chnlStream.mapWithState(stateSpec) kafkastateStream.checkpoint(Seconds(10 * 5)) val kafkaSnapshotStream = kafkastateStream.stateSnapshots()' –

+0

@HyunkeunLee *但你只需要最後一個狀態存儲,爲什麼你還剩下19個存儲?*這完全取決於你如何存儲狀態。新來的國家有*不同的鑰匙*?你在每次迭代中存儲新狀態嗎?向我們展示代碼。關於檢查點,如果它不使用自定義間隔,它應該運行每個批處理時間*默認間隔*。 –

相關問題