是我緩存的理解錯了嗎?在我所有的轉換之後,得到的RDD非常小,比如1GB。它計算的數據非常大,大小約700 GB。星火電子病歷「超出內存限制」可用於檢查點/緩存工作
我要運行的邏輯閱讀成千上萬的相當大的文件,所有計算小得多導致RDD。每次迭代都會處理下一批400個文件,這些文件在讀入時可能會炸燬大約700 GB的大小。傳入的RDD以相同的方式進行處理(讀取和轉換),然後與積累的RDD合併。 I 緩存和檢查點每次迭代後(也是非運行(阻塞= true)舊版本的結果rdd),以便我可以削減RDD譜系,這樣我就不必重新計算結果出錯,並節省執行人員的空間。 所以,我想,在任何時候我真的只需要1 GB *迭代+〜750GB的內存總容量爲我的工作,而1.6 TB應該是綽綽有餘的數量。但顯然我誤解了一些東西。
在每次迭代中,GC的時間越來越長。 Spark UI顯示執行者位於紅色區域(在GC上花費時間超過10%)。然後,整個工作或許未能在第三或第四迭代與像內存限制超過消息,失落執行人/無路徑執行人,和紗線殺死了我的執行人。我認爲通過緩存和檢查點,我爲執行者節省了大量空間。我只是不明白是否有某種內存泄漏? 爲什麼內存繼續填滿?
我在EMR運行星火2.1.1 m3.large實例。我的羣集大小限制在〜1.6TB。我用下面的配置中運行:
driver-memory 8g
deploy-mode cluster
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=100
spark.dynamicAllocation.maxExecutors=200
spark.shuffle.service.enabled=true
executor-cores 4
executor-memory 8g
什麼我的代碼看起來有點像:
var accRdd = <empty>
val batchSize = 400
var iteration = 1
filesToIngest.grouped(batchSize).foreach {
val transformedRdd = transform(accRdd).reduceByKey((row1, row2) =>
combine(row1, row2)
)
val oldAccRdd = accRdd
accRdd = accRdd.union(transformedRdd).reduceByKey((row1, row2) =>
combine(row1, row2)
).coalesce(5 + i)
accRdd.persist(MEMORY_AND_DISK_SER)
accRdd.checkpoint()
oldAccRdd.unpersist(blocking = true) // I assume this will ensure all references to this cleared from memory
log_info(s"Total row count on iteration: ${accRdd.count()}")
iteration += 1
}
我已經按照此建議:https://github.com/deeplearning4j/nd4j/issues/1251,並正嘗試以避免調整其它配置變量相關以gc,記憶分數和jvm。再次,我正在尋找對可能發生的事情的解釋,以及我對緩存/檢查點的假設可能是錯誤的。 謝謝!