2017-08-10 19 views
3

我正試圖在Spark中開發一個重型數學計算,無論是在時間還是內存方面(兩者都達到O(n^2))。我發現持有Iterator的分區對於大型微積分來說並不足夠,因爲它強制實例化(儘管懶惰,因爲它是一個Iterator)每行一個對象。事實上,在一個最簡單的情況下,例如每行都會保存一個向量。但是它對內存都是有害的,因爲我們知道對象的JVM開銷和GC上的所有壓力以及速度,因爲我真的可以提高性能,將我的線性代數操作提高到BLAS級別3(matrix by矩陣,而不是矩陣的矢量,我堅持在這個範例中)。在一個非常簡略這裏就是我想要實現:Spark:斷開分區迭代器以實現更好的內存管理?

while (???) { // loop over some condition, doesn't really matter what 
    val matrix = ??? // an instance of a matrix 
    val broadMatrix = sparkContext.broadcast(matrix) 
    // rdd is an instance of RDD[Vector] that is already cached 
    rdd.mapPartition { 
     iter => 
      val matrixValue = broadMatrix.value() 
      iter.map (vector => matrixValue * vec) 
    } 
    // a bunch of other things relying on that result 
} 

這裏是我的想法:

  1. 我的rdd在上面的代碼緩存,然後有一個Iterator也沒用,是不是它?由於它唯一的優點不是同時記住所有行:但是在這裏它已經被計算和緩存,所以所有的行都保存在內存中......當然可以爭辯說,Spark的可能有一個智能緩存序列化和壓縮數據(我懷疑當存儲級別爲MEMORY_ONLY雖然...)。

  2. 如果1.是真的,那麼它產生的唯一東西是巨大的內存開銷,因爲我擁有與我的rdd中的行一樣多的JVM對象,但是我可以將它降低到每個分區的單個JVM對象。我甚至可以將它降低到一個對象,每個對象具有一個scala object,它可以作爲共享內存用於同一個執行器上的所有分區(我擔心這可能很難處理,因爲我想保留Spark的彈性,因此如果一個分區因任何原因被刪除並重新出現在另一個執行器上,我不想自己處理它,但讓Spark將所有相關的對象自己移動......)。

我的想法,因此將vectorrdd轉化爲一個包含矩陣,是這樣的:已經面臨這個的dilemna

while (???) { // loop over some condition, doesn't really matter what 
    val matrix = ??? // an instance of a matrix 
    val broadMatrix = sparkContext.broadcast(matrix) 
    // rdd is an instance of RDD[Vector] that is already cached 
    rdd.mapPartition { 
     iter => 
      val matrixValue = broadMatrix.value() 
      // iter actually contains one single element which is the matrix containing all vectors stacked 
      // here we have a BLAS-3 operation 
      iter.map (matrix => matrixValue * matrix) 
    } 
    // a bunch of other things relying on that result 
} 

有人嗎?你有沒有經歷過這種內存管理的提前使用?

+0

我可以想象,即使rdd被緩存,如果您更改爲Iterator爲非惰性集合,則spark會再次消耗內存(兩次)。我想你應該嘗試使用一些基準測試來更好地使用哪個版本 –

+0

我想像Spark確實會消耗兩次內存......我會嘗試一下,看看我是否有更好的表現。我會盡快給出基準的結果! –

回答

1

因爲我真的可以提高性能,改善我的線性代數運算直到BLAS級別3(矩陣通過矩陣,而不是通過我在此範例中卡住的向量的矩陣)。

使用Iterators不會強迫你以任何方式使用Vectors,或者爲每個分區甚至超過一個元素。如果需要,您可以輕鬆地爲每個分割創建一個單獨的Matrix對象。

有害的內存,因爲我們知道在JVM開銷對象和被放在GC

我要說,這是比這更復雜的所有壓力。使用Iterators的原因是能夠處理大於內存的分區。使用懶惰Iterators和小對象Spark可以將部分結果泄漏到磁盤並​​使其可用於垃圾收集。這在使用單個大對象時不會發生。根據我的經驗,Spark對於大型物體的GC問題更容易受到影響。

基於描述,我懷疑它應該是有意義的,以避免顯式存儲數據,而是使用堆內存明確地初始化對象。這應該保持GC在海灣,並允許您處理大型物體。但是可以付出等級的方式。