我正試圖在Spark中開發一個重型數學計算,無論是在時間還是內存方面(兩者都達到O(n^2)
)。我發現持有Iterator
的分區對於大型微積分來說並不足夠,因爲它強制實例化(儘管懶惰,因爲它是一個Iterator
)每行一個對象。事實上,在一個最簡單的情況下,例如每行都會保存一個向量。但是它對內存都是有害的,因爲我們知道對象的JVM開銷和GC上的所有壓力以及速度,因爲我真的可以提高性能,將我的線性代數操作提高到BLAS級別3(matrix by矩陣,而不是矩陣的矢量,我堅持在這個範例中)。在一個非常簡略這裏就是我想要實現:Spark:斷開分區迭代器以實現更好的內存管理?
while (???) { // loop over some condition, doesn't really matter what
val matrix = ??? // an instance of a matrix
val broadMatrix = sparkContext.broadcast(matrix)
// rdd is an instance of RDD[Vector] that is already cached
rdd.mapPartition {
iter =>
val matrixValue = broadMatrix.value()
iter.map (vector => matrixValue * vec)
}
// a bunch of other things relying on that result
}
這裏是我的想法:
我的
rdd
在上面的代碼緩存,然後有一個Iterator
也沒用,是不是它?由於它唯一的優點不是同時記住所有行:但是在這裏它已經被計算和緩存,所以所有的行都保存在內存中......當然可以爭辯說,Spark的可能有一個智能緩存序列化和壓縮數據(我懷疑當存儲級別爲MEMORY_ONLY
雖然...)。如果1.是真的,那麼它產生的唯一東西是巨大的內存開銷,因爲我擁有與我的
rdd
中的行一樣多的JVM對象,但是我可以將它降低到每個分區的單個JVM對象。我甚至可以將它降低到一個對象,每個對象具有一個scalaobject
,它可以作爲共享內存用於同一個執行器上的所有分區(我擔心這可能很難處理,因爲我想保留Spark的彈性,因此如果一個分區因任何原因被刪除並重新出現在另一個執行器上,我不想自己處理它,但讓Spark將所有相關的對象自己移動......)。
我的想法,因此將vector
這rdd
轉化爲一個包含矩陣,是這樣的:已經面臨這個的dilemna
while (???) { // loop over some condition, doesn't really matter what
val matrix = ??? // an instance of a matrix
val broadMatrix = sparkContext.broadcast(matrix)
// rdd is an instance of RDD[Vector] that is already cached
rdd.mapPartition {
iter =>
val matrixValue = broadMatrix.value()
// iter actually contains one single element which is the matrix containing all vectors stacked
// here we have a BLAS-3 operation
iter.map (matrix => matrixValue * matrix)
}
// a bunch of other things relying on that result
}
有人嗎?你有沒有經歷過這種內存管理的提前使用?
我可以想象,即使rdd被緩存,如果您更改爲Iterator爲非惰性集合,則spark會再次消耗內存(兩次)。我想你應該嘗試使用一些基準測試來更好地使用哪個版本 –
我想像Spark確實會消耗兩次內存......我會嘗試一下,看看我是否有更好的表現。我會盡快給出基準的結果! –