2014-11-01 32 views
3

我寫了一個簡單的函數來使用updateStateByKey,以查看問題是否因爲我的updateFunc。我認爲這一定是由於別的。我在 - 本地運行這個[4]。爲什麼在使用updateStateByKey時任務大小不斷增長?

val updateFunc = (values: Seq[Int], state: Option[Int]) => { 
    Some(1) 
} 

val state = test.updateStateByKey[Int](updateFunc) 

過了一段時間,有警告,任務大小不斷增加。

WARN TaskSetManager:Stage x包含一個非常大的任務(129 KB)。建議的最大任務大小爲100 KB。

WARN TaskSetManager:Stage x包含一個非常大的任務(131 KB)。建議的最大任務大小爲100 KB。

+0

可能你的狀態正在增長 – maasg 2014-11-04 09:04:14

回答

0

您的流中有越來越多不同的密鑰,每個密鑰都會導致1的新副本被添加到您的狀態。

當前updateStateByKey掃描每個批處理間隔中的每個鍵,即使該鍵沒有數據。這會導致updateStateByKey的批處理時間隨着狀態中鍵的數量而增加,即使數據速率保持不變,即

有一個proposal to solve this

相關問題