我有一個Spark Streaming應用程序正在運行,它使用mapWithState函數來跟蹤RDD的狀態。 應用程序運行罰款幾分鐘,但然後用爲什麼火花工的內存使用量會隨着時間而增加?
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 373
崩潰我觀察到的星火應用增加了內存使用情況隨時間線性,即使我已經爲mapWithStateRDD超時。請參閱下面的代碼片段和內存使用情況 -
val completedSess = sessionLines
.mapWithState(StateSpec.function(trackStateFunction _)
.numPartitions(80)
.timeout(Minutes(5)))
爲什麼要增加內存隨時間線性如果每個RDD明確超時?
我試圖增加內存,但沒關係。我錯過了什麼?
編輯 - 代碼參考
高清trackStateFunction(batchTime:時間,關鍵:字符串值:選項[字符串],狀態:國發[(布爾,列表[字符串]龍)]):選項[ (布爾,列表[字符串])] = {
def updateSessions(newLine: String): Option[(Boolean, List[String])] = {
val currentTime = System.currentTimeMillis()/1000
if (state.exists()) {
val newLines = state.get()._2 :+ newLine
//check if end of Session reached.
// if yes, remove the state and return. Else update the state
if (isEndOfSessionReached(value.getOrElse(""), state.get()._4)) {
state.remove()
Some(true, newLines)
}
else {
val newState = (false, newLines, currentTime)
state.update(newState)
Some(state.get()._1, state.get()._2)
}
}
else {
val newState = (false, List(value.get), currentTime)
state.update(newState)
Some(state.get()._1, state.get()._2)
}
}
value match {
case Some(newLine) => updateSessions(newLine)
case _ if state.isTimingOut() => Some(true, state.get()._2)
case _ => {
println("Not matched to any expression")
None
}
}
}
您有多少傳入流量?多少RAM /磁盤?我們需要更多信息。 –
另外,您有多久檢查一次? –
我有一個由4名工作人員組成的集羣(8個內核,32 GB RAM,每個128 GB SSD)。來自Kinesis Stream的傳入流量爲10-15 MB/s。批處理間隔爲10秒。檢查點間隔爲60s – cmbendre