2017-04-20 52 views
1

你好我經常需要在我的代碼中使用groupByKey,但我知道這是一個非常繁重的操作。由於我正在努力提高性能,我想知道我的方法是否有效地移除所有groupByKey調用。用spark中的reduceByKey替換groupByKey

我被用來創建從另一個RDD的RDD和創建對類型(INT,INT)

rdd1 = [(1, 2), (1, 3), (2 , 3), (2, 4), (3, 5)]

,因爲我需要獲得這樣的事情:

[(1, [2, 3]), (2 , [3, 4]), (3, [5])]

我使用的是out = rdd1.groupByKey,但由於這種方法可能是非常大的數據集問題,我認爲使用這種解決方案:

而不是創造我對類型(INT,INT)我要做的就是創建成對類型的它的RDD rdd1(智力,列表[INT]),所以我rdd1成這個樣子了

rdd1 = [(1, [2]), (1, [3]), (2 , [3]), (2, [4]), (3, [5])]

但這一次達到了同樣的結果我用reduceByKey(_ ::: _)加入所有的鍵值,這應該是更快。你認爲使用這種方法可能會提高性能嗎?我害怕這種類型(Int,List [Int])不是愚蠢的創建一個對,其中的值是一個只包含1個元素的列表?

您是否認爲使用其他方法可以更快地達到相同的結果?謝謝。

+1

我一直想知道同樣的事情,但使用'aggregateByKey'或'combineByKey'來代替,分別使用一個空的List作爲初始化器,然後使用list.add和list.addAll作爲組合器和合並器。這將避免首先創建單元素列表。儘管如此,我相信'groupByKey'已經被優化以在這種情況下更好地工作。 – vefthym

回答

3

我不認爲,如果你的最終結果是,你應該使用reduceByKey

[(1, [2, 3]), (2 , [3, 4]), (3, [5])] 

爲什麼?因爲這是groupByKey是爲什麼,所以它可能是最好的。

groupByKey的問題是,你通常不需要使用相同的密鑰的所有值的列表(或數組)的事,但你可以從這個列表中獲得。如果你真的不需要這個列表,你可以使用reduceByKey來減少與洗牌相同的步驟。

reduceByKey的兩個優點:

  • 它可以改組(減少在相同的執行器的值,以避免不必要的網絡負載)
  • 它從未加載值的整個陣列與之前開始減少同樣的鑰匙進入記憶。這對於巨大的數據集很重要,其中數組可能大幾個GB。

就你的情況而言,如你所示,第一點不是很重要(因爲沒有真正的數據縮減,只是連接),第二點不適用,因爲你想要整個列表。但是,我強烈建議你考慮一下,如果你確實需要整個列表,或者這只是計算中的一個步驟,尤其是在處理大型數據集時。

+0

我正在GraphX中使用圖表工作,因此列表中的列表都是VertexId,所以是的,我需要列表。是的,這個答案讓人想到。 – Matt

3

由於我正在努力提高性能,我想知道如果我的方法來刪除所有groupByKey調用是有效的。

查看RDD.toDebugString查看RDD轉換的邏輯計劃。這應該能讓你很好的瞭解你的行動有多快(或不是)。

避免ShuffledRDD,因爲它們產生通常非常昂貴的洗牌操作。

至於你使用reduceByKey的想法,可以考慮前面的例如keyBy

rdd.keyBy(_.kind).reduceByKey(....) 

您也可以考慮aggregateByKey作爲最普遍的轉變(即坐在後面groupBy和親戚)。

最後但並非最不重要,groupBy有兩個變種,允許定義分區數量或Partitioner。這些可以避免昂貴的洗牌。

請閱讀org.apache.spark.rdd.PairRDDFunctions

使用網頁界面更好地瞭解「查詢」的性能。瞭解你的數據將會有很大的幫助。花費足夠的時間(因爲優化查詢的時間可能會被浪費)。

+1

'keyBy'從非對RDD創建(K,V)元組。 OP的RDD是鍵值RDD,所以他不必使用keyBy :)其次,groupByKey可能非常低效,而不是減少值。在OP的問題中,我們可以看到他減少了值,所以應該使用reduceByKey,而不是groupByKey –

1

可能有點遲來回答這個問題。它可能會幫助其他人。

val tuples = List((1, 2), (1, 3), (2 , 3), (2, 4), (3, 5)) 
val context = getContext() // get Spark Context. 
val tuplesRDD = context.parallelize(tuples) 

val list = mutable.MutableList.empty[Int] 
val addItemsToList = (s: mutable.MutableList[Int], v : Int) => s += v 
val mergeLists = (x: mutable.MutableList[Int], 
        y: mutable.MutableList[Int]) => x ++= y 

val groupByKey = tuplesRDD.aggregateByKey(list)(addItemsToList, mergeLists) 
groupByKey.cache() 
groupByKey.foreach(x => println(x)) 

輸出

(1,MutableList(2,3))
(2,MutableList(3,4))
(3,MutableList(5))

相關問題