0

是我緩存的理解錯了嗎?在我所有的轉換之後,得到的RDD非常小,比如1GB。它計算的數據非常大,大小約700 GB。星火電子病歷「超出內存限制」可用於檢查點/緩存工作

我要運行的邏輯閱讀成千上萬的相當大的文件,所有計算小得多導致RDD。每次迭代都會處理下一批400個文件,這些文件在讀入時可能會炸燬大約700 GB的大小。傳入的RDD以相同的方式進行處理(讀取和轉換),然後與積累的RDD合併。 I 緩存和檢查點每次迭代後(也是非運行(阻塞= true)舊版本的結果rdd),以便我可以削減RDD譜系,這樣我就不必重新計算結果出錯,並節省執行人員的空間。 所以,我想,在任何時候我真的只需要1 GB *迭代+〜750GB的內存總容量爲我的工作,而1.6 TB應該是綽綽有餘的數量。但顯然我誤解了一些東西。

在每次迭代中,GC的時間越來越長。 Spark UI顯示執行者位於紅色區域(在GC上花費時間超過10%)。然後,整個工作或許未能在第三或第四迭代與像內存限制超過消息,失落執行人/無路徑執行人,和紗線殺死了我的執行人。我認爲通過緩存和檢查點,我爲執行者節省了大量空間。我只是不明白是否有某種內存泄漏? 爲什麼內存繼續填滿?

我在EMR運行星火2.1.1 m3.large實例。我的羣集大小限制在〜1.6TB。我用下面的配置中運行:

driver-memory 8g 
deploy-mode cluster 
spark.dynamicAllocation.enabled=true 
spark.dynamicAllocation.minExecutors=100 
spark.dynamicAllocation.maxExecutors=200 
spark.shuffle.service.enabled=true 
executor-cores 4 
executor-memory 8g 

什麼我的代碼看起來有點像:

var accRdd = <empty> 
val batchSize = 400 
var iteration = 1 
filesToIngest.grouped(batchSize).foreach { 
    val transformedRdd = transform(accRdd).reduceByKey((row1, row2) => 
     combine(row1, row2) 
    ) 
    val oldAccRdd = accRdd 
    accRdd = accRdd.union(transformedRdd).reduceByKey((row1, row2) => 
     combine(row1, row2) 
    ).coalesce(5 + i) 
    accRdd.persist(MEMORY_AND_DISK_SER) 
    accRdd.checkpoint() 
    oldAccRdd.unpersist(blocking = true) // I assume this will ensure all references to this cleared from memory 
    log_info(s"Total row count on iteration: ${accRdd.count()}") 
    iteration += 1 
} 

我已經按照此建議:https://github.com/deeplearning4j/nd4j/issues/1251,並正嘗試以避免調整其它配置變量相關以gc,記憶分數和jvm。再次,我正在尋找對可能發生的事情的解釋,以及我對緩存/檢查點的假設可能是錯誤的。 謝謝!

回答

0

你可能想看看一些從我們的記憶頁的建議: https://deeplearnin4j.org/memory

https://deeplearning4j.org/spark

一般來說,deeplearning4j都有自己的堆外的內存。你應該使用更多的執行器來設置較小的批處理大小,注意javacpp關閉堆配置,並將火花內存設置爲允許的範圍。