這是來自here的後續問題。我正在嘗試基於這個implementation實現k-means。它很好,,但我想取代groupByKey()
與reduceByKey()
,但我不知道如何(我不擔心現在的表現)。下面是相關縮小的代碼:用reduceByKey替換groupByKey()
val data = sc.textFile("dense.txt").map(
t => (t.split("#")(0), parseVector(t.split("#")(1)))).cache()
val read_mean_centroids = sc.textFile("centroids.txt").map(
t => (t.split("#")(0), parseVector(t.split("#")(1))))
var centroids = read_mean_centroids.takeSample(false, K, 42).map(x => x._2)
do {
var closest = read_mean_centroids.map(p => (closestPoint(p._2, centroids), p._2))
var pointsGroup = closest.groupByKey() // <-- THE VICTIM :)
var newCentroids = pointsGroup.mapValues(ps => average(ps.toSeq)).collectAsMap()
..
注意println(newCentroids)
會給:
地圖(23 - >(-6.269305E-4,-0.0011746404,-4.08004E-5),8 - >(-5.108732E-4,7.336348E-4,-3.707591E-4),17 - >(-0.0016383086,-0.0016974678,1.45 ..
println(closest)
和:
MapPartitionsRDD [6] at map at kmeans.scala:75
相關問題:Using reduceByKey in Apache Spark (Scala)。
DEF reduceByKey(FUNC:(V,V)⇒V):RDD [(K,V)]
合併的值用於每個鍵使用關聯減少函數。
DEF reduceByKey(FUNC:(V,V)⇒V,numPartitions:智力):RDD [(K,V)]
合併使用的締合減少函數中的每個鍵的值。
DEF reduceByKey(分割器:分區程序,FUNC:(V,V)⇒V):RDD [(K,V)]
使用關聯減少功能合併用於每個鍵的值。
DEF groupByKey():RDD [(K,可迭代[V])]
組在RDD每個鍵成一個單一的序列值。
工作就像一個魅力!你能解釋我們在這裏做了什麼嗎?我的意思是爲什麼我想用reduceByKey()替換groupByKey()?這樣做的主要優點是什麼?相關:http://stackoverflow.com/questions/24804619/how-does-spark-aggregate-function-aggregatebykey-work – gsamaras
好吧,'groupByKey'會導致一堆東西被髮送到各個節點之間即所有與給定鍵相關的值,用於所有鍵和數據的部分。另一方面,使用'aggregateByKey'方法,每個部分只負責向(向駕駛員)傳送由總和和計數組成的對。這麼少的網絡通信以及無需創建所有這些值的集合(因爲它只是它們的總和和數量在計算平均值時很重要)。 –
好吧,這就是我的想法,非常感謝! – gsamaras