2017-09-12 239 views
2

我有星火Scala的一個問題,我想指望從個R dd數據平均,我創建了一個新的RDD這樣,如何計算Spark RDD的平均值?

[(2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)] 

我想指望他們這樣,

[(2,(110+130+120)/3),(3,(200+206+206)/3),(4,(150+160+170)/3)] 

那麼,得到這樣的結果,

[(2,120),(3,204),(4,160)] 

我該怎麼用RDD的scala做這件事? 我用火花版本1.6

回答

1

您可以使用aggregateByKey。

val rdd = sc.parallelize(Seq((2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170))) 
val agg_rdd = rdd.aggregateByKey((0,0))((acc, value) => (acc._1 + value, acc._2 + 1),(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) 
val sum = agg_rdd.mapValues(x => (x._1/x._2)) 
sum.collect 
+0

感謝alexgids,Akash Sethi,vdep。三種方法都可以使用。謝謝! – lee

1

您可以在此case.like使用groupByKey

val rdd = spark.sparkContext.parallelize(List((2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170))) 
val processedRDD = rdd.groupByKey.mapValues{iterator => iterator.sum/iterator.size} 
processedRDD.collect.toList 

這裏,groupByKey將返回RDD[(Int, Iterator[Int])],那麼你可以簡單地在Iterator

希望這適用於平均操作適合你

謝謝

+0

它會強烈建議aggregateByKey或combineByKey。你的解決方案是好的,但可能會導致內存不足錯誤 –

+0

謝謝@T.Gawęda談到優化是的,我的代碼缺乏那裏我認爲它是小的scenerio –

1

您可以使用.combineByKey()來計算平均:

val data = sc.parallelize(Seq((2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170))) 

val sumCountPair = data.combineByKey((x: Int) => (x.toDouble,1), 
            (pair1: (Double, Int), x: Int) => (pair1._1 + x, pair1._2 + 1), 
            (pair1: (Double, Int), pair2: (Double, Int)) => (pair1._1 + pair2._1, pair1._2 + pair2._2)) 

val average = sumCountPair.map(x => (x._1, (x._2._1/x._2._2))) 
average.collect() 

這裏sumCountPair返回類型爲RDD[(Int, (Double, Int))],表示:(Key, (SumValue, CountValue))。下一步只是將數除以總數並返回(Key, AverageValue)

+1

你和亞歷克斯的答案是最好的 - 他們不需要在一個節點上分組的每個元素 –