我正在開發一個Scala(2.11)/ Spark(1.6.1)流項目並使用mapWithState()
來跟蹤以前批次中顯示的數據。Spark Streaming mapWithState似乎會定期重建完整狀態
狀態分佈在多個節點上的20個分區中,使用StateSpec.function(trackStateFunc _).numPartitions(20)
創建。在這種狀態下,我們只有幾個鍵(〜100)映射到Sets
,最多有160.000個條目,這些條目在整個應用程序中都會增長。整個狀態達到3GB
,可由集羣中的每個節點處理。在每個批次中,一些數據被添加到一個狀態,但是直到過程的最後纔會被刪除,即約15分鐘。
在遵循應用程序UI的同時,與其他批次相比,每10批次的處理時間都非常高。看到的影像:
黃色字段代表高處理時間。
的更詳細的工作視圖顯示,在這些批次發生在某一點,恰好當所有20個分區「跳過」。或者這就是UI所說的。
我的skipped
的理解是,每個狀態劃分是不執行一個任務可能,因爲它並不需要重新計算。但是,我不明白爲什麼每個作業中skips
的數量有所不同,以及爲什麼最後一個作業需要這麼多處理。無論狀態的大小如何,處理時間都會更長,只會影響持續時間。
這是mapWithState()
功能中的錯誤還是這種打算的行爲?基礎數據結構是否需要某種重新洗牌,在該州的Set
是否需要複製數據?或者更可能是我的應用程序中的缺陷?
在我來說,我可以看到,每一項工作都在檢查點設置該批次。爲什麼不只是最後一份工作? 你有什麼解決方案來關注國家的規模?爲了能夠優化它。 – crak
@crak什麼是您的檢查點間隔?你怎麼看到每項工作都檢查數據? –
每10批次。我的眼睛是虐待,我有16個工作做檢查站。這是邏輯,我有12個mapWithState,我可以在spark ui中看到足跡。但不知道哪一個尺寸最大。 mapWithState商店只是狀態不像以前的植入? – crak