Imp。提示:
只要你有重量級的初始化應該對許多RDD
元素,而不是每個RDD
元素進行一次 完成,如果這 初始化,如創建從第三方 庫對象,不能被序列化(以便Spark可以通過集羣上的 將其傳輸到工作節點),使用mapPartitions()
而不是 map()
。 mapPartitions()
規定初始化完成 每個工作者任務/線程/分區而不是一次RDD
數據 元素爲example :見下文。
val newRd = myRdd.mapPartitions(partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(record => {
readMatchingFromDB(record, connection)
}).toList // consumes the iterator, thus calls readMatchingFromDB
connection.close() // close dbconnection here
newPartition.iterator // create a new iterator
})
Q2。 確實flatMap
表現如同地圖或像mapPartitions
?
是的。請參閱flatmap
的示例2 ..其自我解釋。
Q1。什麼之間的RDD的map
和mapPartitions
map
的作品被使用在每個單元級的功能,而 mapPartitions
行使在分區級別功能的差異。
示例方案:如果我們有在特定RDD
分區100K元件那麼我們將會觸發關閉功能正在使用的映射變換100K時候,我們使用map
。相反,如果我們使用mapPartitions
,那麼我們將只調用一次特定函數,但是我們將傳遞所有100K記錄並在一次函數調用中取回所有響應。
自從map
在特定函數上工作很多次以來,性能會有所提高,特別是如果函數每次都會花費很多代價,如果我們一次傳入所有元素,案例mappartitions
)。
地圖
適用於RDD的每個項目轉換函數,並返回 結果作爲新RDD。
清單異體
DEF映射[U:ClassTag](F:T => U):RDD [U]
實施例:
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length)
val c = a.zip(b)
c.collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
mapPartitions
這是一個專門的地圖,被稱爲每個分區只有一次。 通過輸入參數(Iterarator [T]),各個分區的全部內容可作爲 順序值的數據流提供。 自定義函數必須返回另一個Iterator [U]。結合的 結果迭代器會自動轉換爲新的RDD。請注意,由於我們選擇了分區,因此下面的 結果中缺少元組(3,4)和(6,7)。
preservesPartitioning
指示輸入功能是否保留 分割器,這應該是false
除非這是一雙RDD和輸入 函數不修改的鍵。
清單異體
DEF mapPartitions [U:ClassTag](F:迭代[T] =>迭代[U], preservesPartitioning:布爾=假):RDD [U]
實施例1
val a = sc.parallelize(1 to 9, 3)
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res = List[(T, T)]()
var pre = iter.next
while (iter.hasNext)
{
val cur = iter.next;
res .::= (pre, cur)
pre = cur;
}
res.iterator
}
a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
實施例2
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
var res = List[Int]()
while (iter.hasNext) {
val cur = iter.next;
res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
}
res.iterator
}
x.mapPartitions(myfunc).collect
// some of the number are not outputted at all. This is because the random number generated for it is zero.
res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)
上述程序還可以使用flatMap如下寫入。使用flatmap
val x = sc.parallelize(1 to 10, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
結論
例2:
mapPartitions
轉型比map
更快,因爲它要求你的函數次/分,沒有一次/元..
閱讀下面的後回答,你可以看看[經驗]由實際使用它的人共享(https://bzhangusc.wordpress.com/2014/06/19/optimize-map-performamce-with-mappartitions/) https:// bzhangusc.wordpress.com/2014/06/19/optimize-map-performamce-with-mappartitions/ – Abhidemon