2016-03-16 23 views
14

我正在開發一個Scala(2.11)/ Spark(1.6.1)流項目並使用mapWithState()來跟蹤以前批次中顯示的數據。Spark Streaming mapWithState似乎會定期重建完整狀態

狀態分佈在多個節點上的20個分區中,使用StateSpec.function(trackStateFunc _).numPartitions(20)創建。在這種狀態下,我們只有幾個鍵(〜100)映射到Sets,最多有160.000個條目,這些條目在整個應用程序中都會增長。整個狀態達到3GB,可由集羣中的每個節點處理。在每個批次中,一些數據被添加到一個狀態,但是直到過程的最後纔會被刪除,即約15分鐘。

在遵循應用程序UI的同時,與其他批次相比,每10批次的處理時間都非常高。看到的影像:

The spikes show the higher processing time.

黃色字段代表高處理時間。

enter image description here

的更詳細的工作視圖顯示,在這些批次發生在某一點,恰好當所有20個分區「跳過」。或者這就是UI所說的。

enter image description here

我的skipped的理解是,每個狀態劃分是不執行一個任務可能,因爲它並不需要重新計算。但是,我不明白爲什麼每個作業中skips的數量有所不同,以及爲什麼最後一個作業需要這麼多處理。無論狀態的大小如何,處理時間都會更長,只會影響持續時間。

這是mapWithState()功能中的錯誤還是這種打算的行爲?基礎數據結構是否需要某種重新洗牌,在該州的Set是否需要複製數據?或者更可能是我的應用程序中的缺陷?

回答

9

這是一個在mapWithState()函數中的bug還是這個打算的 行爲?

這是打算的行爲。你看到的高峯是因爲你的數據在給定的批次結束時正在檢查點。如果你會注意到更長時間的批次,你會發現它每隔100秒持續發生一次。這是因爲檢查點時間是恆定的,並且根據您的batchDuration來計算,除非您明確設置DStream.checkpoint間隔,否則您與數據源通話的頻率是多少,以讀取批次乘以某個常數。

這裏是MapWithStateDStream相關的一段代碼:

​​

其中DEFAULT_CHECKPOINT_DURATION_MULTIPLIER是:

private[streaming] object InternalMapWithStateDStream { 
    private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10 
} 

哪個排隊正好與你所看到的行爲,因爲你讀的批次時間爲每10秒=> 10 * 10 = 100秒。

這是正常現象,這是使用Spark持久保存狀態的代價。你的優化可以考慮如何儘可能地減少你必須保存在內存中的狀態的大小,以便這個序列化儘可能快。另外,確保數據遍佈足夠的執行者,以便狀態在所有節點之間均勻分佈。另外,我希望你已經打開Kryo Serialization而不是默認的Java序列化,這可以給你一個有意義的性能提升。

+0

在我來說,我可以看到,每一項工作都在檢查點設置該批次。爲什麼不只是最後一份工作? 你有什麼解決方案來關注國家的規模?爲了能夠優化它。 – crak

+0

@crak什麼是您的檢查點間隔?你怎麼看到每項工作都檢查數據? –

+0

每10批次。我的眼睛是虐待,我有16個工作做檢查站。這是邏輯,我有12個mapWithState,我可以在spark ui中看到足跡。但不知道哪一個尺寸最大。 mapWithState商店只是狀態不像以前的植入? – crak

1

除了接受的答案,指出與檢查點相關的序列化的價格之外,還有另一個不太爲人所知的問題,可能會影響spikey行爲:驅逐已刪除的狀態。

具體而言,'刪除'或'超時'狀態不會立即從映射中刪除,而是被標記爲刪除,並且僅在序列化過程中才會被刪除[在Spark 1.6.1中,請參閱writeObjectInternal()]。

這有兩個性能影響,僅每10批做一次發生:

  1. 遍歷和刪除過程都有它的價格
  2. 如果處理超時/刪除的事件,例如流其保存到外部存儲設備,所有10個批次的相關費用將只在這一點上支付(而不是人們可能會認爲,每個RDD)
相關問題