2016-08-08 53 views
-5

鑑於以下RDD:阿帕奇火花 - 斯卡拉 - ReduceByKey - 連鍵重複高達兩倍僅

val vectors = RDD [String, Int] = ((k1,v1),(k1,v2),(k2,v3),...) 
其中鍵出現兩次或者(K1)或一次(K2)

,從未不止於此。我想:

val uniqVectors = RDD[String, Int] = ((k1, v1*v2), (k2, v3), ...) 

一種方法是使用reduceByKey:

val uniqVectors = vectors.reduceByKey((a,b) => a*b) 

然而,它與7B元素陣列太慢。 在這個特定情況下有沒有更快的方法?

+0

呃...它不慢...因爲7B元素而需要花費時間。 –

+0

是的!我明白。但考慮我正在使用2kgb ram和50個內核的機器。此外,我正在尋找更快的解決方案,在這種情況下,凱斯不會重複一次以上。 – kambiz

+0

那麼......執行速度不僅取決於內存,還取決於CPU內核的數量。在Spark集羣上進行調度時增加執行程序的數量,並在創建rdd時增加分區數量。 –

回答

0

什麼(可能)花費的時間在這裏被洗牌的數據:當你想組的兩個或多個記錄,它們必須駐留在同一個分區中,所以星火必須先洗牌的記錄,使所有記錄相同的密鑰在一個分區中。現在

,即使每個按鍵都有兩個記錄最多,這洗牌將要發生,除非你能以某種方式保證每個密鑰已包含在單個分區 - 例如,如果您加載此RDD從HDFS和你不知何故知道,每個密鑰駐留在單個文件部分開始。在這種(不可能的)的情況下,你可以使用mapPartitions執行上的每個分區分組自己分開,從而節省了洗牌:

vectors.mapPartitions( 
    iter => iter.toList.groupBy(_._1).map { case (k, list) => (k, list.map(_._2).reduce(_ * _)) }.iterator, 
    preservesPartitioning = true) 

無的,這是特殊到每個鍵的最大重複是2的情況下, 順便一提。