2015-10-19 99 views
9

當我需要對RDD中的數據進行分組時,我總是使用reduceByKey,因爲它在混洗數據之前執行map side reduce,這通常意味着更少的數據被混洗,從而獲得更好的性能。即使地圖側減少功能收集所有值並且實際上並未減少數據量,我仍然使用reduceByKey,因爲我假設reduceByKey的性能永遠不會比groupByKey差。但是,我想知道這個假設是否正確,或者確實存在groupByKey應該是首選的情況?groupByKey比reduceByKey更受歡迎

+0

從下面我得到的答案(並感謝你的),@eliasah說,'groupByKey'只是語法糖,而@climbage認爲'reduceByKey '如果我用它複製'groupByKey'功能可能會稍微慢一些。我想我會實際嘗試在一些示例上測試這兩個函數:) –

+0

http://stackoverflow.com/questions/30825936/when-should-groupbykey-api-used-in-spark-programming – Knight71

+0

唯一一次我'需要使用groupByKey來計算取決於前一個值的數據樣本。預先計算的運行總數就是一個例子。 GPS距離。等 – pestilence669

回答

13

我相信有由climbageeliasah忽略了這個問題的另一個方面:

  • 代碼的可讀性
  • 代碼的可維護性
  • 代碼庫大小

如果操作不減少量的數據必須以某種方式在語義上等同於GroupByKey。讓我們假設我們有RDD[(Int,String)]

import scala.util.Random 
Random.setSeed(1) 

def randomString = Random.alphanumeric.take(Random.nextInt(10)).mkString("") 

val rdd = sc.parallelize((1 to 20).map(_ => (Random.nextInt(5), randomString))) 

,我們想連接所有字符串對於給定的關鍵。隨着groupByKey這是很簡單的:

rdd.groupByKey.mapValues(_.mkString("")) 

reduceByKey樸素的解決方案是這樣的:

rdd.reduceByKey(_ + _) 

它是短,可以說是很容易理解,但是從兩個問題遭遇:

  • 是效率極低,因爲它每次都創建一個新的String對象*
  • 暗示臨時執行T速度比它在現實中是不太昂貴的,特別是如果你只分析DAG或調試字符串

對付,我們需要一個可變的數據結構中的第一個問題:

import scala.collection.mutable.StringBuilder 

rdd.combineByKey[StringBuilder](
    (s: String) => new StringBuilder(s), 
    (sb: StringBuilder, s: String) => sb ++= s, 
    (sb1: StringBuilder, sb2: StringBuilder) => sb1.append(sb2) 
).mapValues(_.toString) 

它仍然建議其他事情真的在進行,並且非常冗長,尤其是如果在腳本中重複多次。當然,你可以提取匿名函數

val createStringCombiner = (s: String) => new StringBuilder(s) 
val mergeStringValue = (sb: StringBuilder, s: String) => sb ++= s 
val mergeStringCombiners = (sb1: StringBuilder, sb2: StringBuilder) => 
    sb1.append(sb2) 

rdd.combineByKey(createStringCombiner, mergeStringValue, mergeStringCombiners) 

,但在一天結束的時候它仍然意味着額外的努力來理解這個代碼,增加了複雜性並沒有真正的附加值。我覺得特別麻煩的一件事是明確包含可變數據結構。即使Spark處理幾乎所有的複雜性,也意味着我們不再擁有一個優雅的,引用透明的代碼。

我的觀點是如果你真的通過使用reduceByKey來減少數據量。否則,你會使自己的代碼難以寫入,難以分析並且無法獲得任何回報。

注意

這個答案是專注於Scala的API。目前的Python實現與JVM的實現完全不同,並且包含優化,這些優化在類似於groupBy的操作的情況下提供比天真的reduceByKey實現更大的優勢。


*請參閱Spark performance for Scala vs Python一個有說服力的例子

+0

這些都是非常有效的點,實際上我會再次開始使用'groupByKey'(當數據量不能減少!)。謝謝,@ zero323。 –

+0

我無法提供更好的!歡呼的人:) – eliasah

+2

謝謝你們,我真的很感激它:) – zero323

2

根據代碼文檔,我不會發明輪子,groupByKey操作會將RDD中每個鍵的值分組爲單個序列,該序列還允許通過傳遞來控制所得鍵值對RDD的分區一個Partitioner

此操作可能非常昂貴。如果您正在進行分組以執行每個密鑰的彙總(例如總計或平均值),則使用aggregateByKeyreduceByKey將提供更好的性能。

注意:按照當前的實施,groupByKey必須能夠保存內存中任意鍵的所有鍵值對。如果某個鍵的值太多,可能會導致OOME。實際上,我更喜歡combineByKey操作,但如果您不熟悉map-reduce範例,則很難理解組合器和合並的概念。爲此,您可以閱讀雅虎地圖 - 減少聖經here,這很好地解釋了這個話題。

欲瞭解更多信息,我建議您閱讀PairRDDFunctions code

+0

爲什麼這是低票? – eliasah

+0

我明白與'groupByKey'相關的可能問題(例如給定鍵的值太多) - 問題是如果有時候'groupByKey'實際上是更好的選擇。您提到使用'groupByKey'時可以控制生成的鍵值對的分區,但也可以使用'reduceByKey'進行控制,這似乎不是使用'groupByKey'的原因,或者是我誤解你了? –

+1

這是完全正確的,您可以將'groupByKey'視爲語法糖。如果你可以避免它,那麼使用aggregateByKey,reduceByKey或combineByKey – eliasah

5

reduceByKeygroupByKey都使用combineByKey與不同的組合/合併語義。

我看到的關鍵差異是groupByKey將標誌(mapSideCombine=false)傳遞給洗牌引擎。根據問題SPARK-772判斷,這是洗牌引擎在數據大小不會改變時不運行映射合併器的暗示。

所以,我要說,如果你要使用reduceByKey複製groupByKey,你可能會看到一個輕微的性能命中。