2014-01-17 145 views
81

RDD'smapmapPartitions方法有什麼區別? flatMap的行爲如同map還是像mapPartitions?謝謝。Apache Spark:map vs mapPartitions?

(編輯) 即有什麼區別(無論是語義或執行方面)

def map[A, B](rdd: RDD[A], fn: (A => B)) 
       (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = { 
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) }, 
     preservesPartitioning = true) 
    } 

之間:

def map[A, B](rdd: RDD[A], fn: (A => B)) 
       (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = { 
    rdd.map(fn) 
    } 
+2

閱讀下面的後回答,你可以看看[經驗]由實際使用它的人共享(https://bzhangusc.wordpress.com/2014/06/19/optimize-map-performamce-with-mappartitions/) https:// bzhangusc.wordpress.com/2014/06/19/optimize-map-performamce-with-mappartitions/ – Abhidemon

回答

78

什麼是RDD的地圖和mapPartitions之間的區別方法?

方法map源RDD的每個元件轉換成通過應用函數RDD結果的單個元素。 mapPartitions將源RDD的每個分區轉換爲結果的多個元素(可能沒有)。

並且flatMap的行爲像map還是像mapPartitions?

都不是,flatMap作品的單個元件上(如map)和產生的結果的多個元件(如mapPartitions)。

+1

謝謝 - 也是如此MAP原因洗牌(或以其他方式改變分區的數量)?它是否在節點之間移動數據?我一直在使用mapPartitions來避免在節點之間移動數據,但不確定flapMap是否會這樣做。 –

+0

如果你看看源 - https://github.com/apache/incubator-spark/blob/97ac06018206b593600594605be241d0cd706e08/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala和https:/ /github.com/apache/incubator-spark/blob/97ac06018206b593600594605be241d0cd706e08/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala - 既'map'和'flatMap'具有完全相同的分區,同父母。 –

+11

作爲一個說明,通過揚聲器在2013舊金山星火峯會(goo.gl/JZXDCR)提供的演示重點介紹了單位記錄開銷任務執行與mapPartition比與地圖轉化更好。據介紹,這是由於設立新任務的成本高昂。 –

48

Imp。提示:

只要你有重量級的初始化應該對許多RDD元素,而不是每個RDD元素進行一次 完成,如果這 初始化,如創建從第三方 庫對象,不能被序列化(以便Spark可以通過集羣上的 將其傳輸到工作節點),使用mapPartitions()而不是 map()mapPartitions()規定初始化完成 每個工作者任務/線程/分區而不是一次RDD數據 元素爲example :見下文。

val newRd = myRdd.mapPartitions(partition => { 
    val connection = new DbConnection /*creates a db connection per partition*/ 

    val newPartition = partition.map(record => { 
    readMatchingFromDB(record, connection) 
    }).toList // consumes the iterator, thus calls readMatchingFromDB 

    connection.close() // close dbconnection here 
    newPartition.iterator // create a new iterator 
}) 

Q2。 確實flatMap表現如同地圖或像mapPartitions

是的。請參閱flatmap的示例2 ..其自我解釋。

Q1。什麼之間的RDD的mapmapPartitions

map的作品被使用在每個單元級的功能,而 mapPartitions行使在分區級別功能的差異。

示例方案如果我們有在特定RDD分區100K元件那麼我們將會觸發關閉功能正在使用的映射變換100K時候,我們使用map。相反,如果我們使用mapPartitions,那麼我們將只調用一次特定函數,但是我們將傳遞所有100K記錄並在一次函數調用中取回所有響應。

自從map在特定函數上工作很多次以來,性能會有所提高,特別是如果函數每次都會花費很多代價,如果我們一次傳入所有元素,案例mappartitions)。

地圖

適用於RDD的每個項目轉換函數,並返回 結果作爲新RDD。

清單異體

DEF映射[U:ClassTag](F:T => U):RDD [U]

實施例:

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) 
val b = a.map(_.length) 
val c = a.zip(b) 
c.collect 
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

mapPartitions

這是一個專門的地圖,被稱爲每個分區只有一次。 通過輸入參數(Iterarator [T]),各個分區的全部內容可作爲 順序值的數據流提供。 自定義函數必須返回另一個Iterator [U]。結合的 結果迭代器會自動轉換爲新的RDD。請注意,由於我們選擇了分區,因此下面的 結果中缺少元組(3,4)和(6,7)。

preservesPartitioning指示輸入功能是否保留 分割器,這應該是false除非這是一雙RDD和輸入 函數不修改的鍵。

清單異體

DEF mapPartitions [U:ClassTag](F:迭代[T] =>迭代[U], preservesPartitioning:布爾=假):RDD [U]

實施例1

val a = sc.parallelize(1 to 9, 3) 
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { 
    var res = List[(T, T)]() 
    var pre = iter.next 
    while (iter.hasNext) 
    { 
    val cur = iter.next; 
    res .::= (pre, cur) 
    pre = cur; 
    } 
    res.iterator 
} 
a.mapPartitions(myfunc).collect 
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

實施例2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3) 
def myfunc(iter: Iterator[Int]) : Iterator[Int] = { 
    var res = List[Int]() 
    while (iter.hasNext) { 
    val cur = iter.next; 
    res = res ::: List.fill(scala.util.Random.nextInt(10))(cur) 
    } 
    res.iterator 
} 
x.mapPartitions(myfunc).collect 
// some of the number are not outputted at all. This is because the random number generated for it is zero. 
res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

上述程序還可以使用flatMap如下寫入。使用flatmap

val x = sc.parallelize(1 to 10, 3) 
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect 

res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

結論

例2:

mapPartitions轉型比map更快,因爲它要求你的函數次/分,沒有一次/元..

+2

我知道你可以使用'map'或'mapPartitions'來實現相同的結果(參見問題中的兩個例子);這個問題是關於你爲什麼選擇一種方式。其他答案中的評論非常有用!此外,你沒有提到'map'和'flatMap'將'false'傳遞給'preservesPartitioning',以及它的含義是什麼。 –

+1

函數每次執行與函數執行一次parition是我失蹤的鏈接。一次使用mapPartition訪問多個數據記錄是非常寶貴的事情。欣賞答案 –

+0

有沒有'map'比'mapPartitions'更好的場景?如果'mapPartitions'非常好,爲什麼它不是默認的地圖實現? – ruhong

4

地圖

  1. 它處理在一次一行,非常類似於映射()的MapReduce的方法。
  2. 您從每一行後的轉換中返回。

MapPartitions

  1. 它處理完整區塊一氣呵成。
  2. 您可以處理整個分區後從函數返回只有一次。
  3. 所有的中間結果需要在內存中保存,直到你處理整個分區。
  4. 提供您喜歡的設置()圖()和清理()的MapReduce功能

Map Vs mapPartitions http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Maphttp://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions http://bytepadding.com/big-data/spark/spark-mappartitions/

+0

- 如果您正在執行迭代器到迭代器的轉換,而沒有將迭代器實現爲某種類型的集合,那麼您將不必將整個分區保存在內存中,實際上,這種方式可以實現spark將部分分區泄露到磁盤。 – ilcord

+0

你不必在內存中保存整個分區,但結果。直到處理完整個分區後才能返回結果 – KrazyGautam