2015-02-09 29 views
7

我已經實現了一個解決方案,通過密鑰對RDD[K, V]組進行分組,並根據每個組(K, RDD[V])使用partitionByPartitioner計算數據。儘管如此,我不確定它是否真的有效率,我想要有你的觀點。使用PartitionBy通過密鑰拆分和有效計算RDD組

下面是一個示例情況:根據[K: Int, V: Int]列表,計算V什麼惡意每組K,知道應當分佈和V值可能是非常大的。這應該給:

List[K, V] => (K, mean(V)) 

簡單的分區程序類:

class MyPartitioner(maxKey: Int) extends Partitioner { 

    def numPartitions = maxKey 

    def getPartition(key: Any): Int = key match { 
     case i: Int if i < maxKey => i 
    } 
    } 

分區代碼:

val l = List((1, 1), (1, 8), (1, 30), (2, 4), (2, 5), (3, 7)) 

     val rdd = sc.parallelize(l) 
     val p = rdd.partitionBy(new MyPartitioner(4)).cache() 

     p.foreachPartition(x => { 
     try { 
      val r = sc.parallelize(x.toList) 
      val id = r.first() //get the K partition id 
      val v = r.map(x => x._2) 
      println(id._1 + "->" + mean(v)) 
     } catch { 
      case e: UnsupportedOperationException => 0 
     } 
     }) 

輸出是:

​​

我的問題是:

  1. 撥打partitionBy時會發生什麼? (對不起,我沒有找到足夠的規格)
  2. 通過分區映射是否真的有效率,知道在我的生產環境中它不會有太多的鍵(如樣本50) 100萬樣本)
  3. paralellize(x.toList)的費用是多少?這是否一致? (我需要RDD輸入mean()
  4. 你會怎樣做自己?

問候

回答

4

您的代碼不應該工作。您不能將SparkContext對象傳遞給執行者。 (這不是Serializable。)我也不明白你爲什麼需要。

要計算平均值,您需要計算總和和計數並取其比率。默認的分區將會很好。

def meanByKey(rdd: RDD[(Int, Int)]): RDD[(Int, Double)] = { 
    case class SumCount(sum: Double, count: Double) 
    val sumCounts = rdd.aggregateByKey(SumCount(0.0, 0.0))(
    (sc, v) => SumCount(sc.sum + v, sc.count + 1.0), 
    (sc1, sc2) => SumCount(sc1.sum + sc2.sum, sc1.count + sc2.count)) 
    sumCounts.map(sc => sc.sum/sc.count) 
} 

這是一種高效的單程計算,可以很好地推廣。

+0

謝謝你的回答,當然它不能工作,我沒有火花編碼技巧的所有反射,我已經被我的本地jvm寵壞了。儘管如此,實際上我不需要計算平均值,但是需要一個複雜的ml方法,而且我需要一個RDD [Vector]。我怎麼能從一個獨特的RDD [Int,Int]中獲得(key,RDD [Vector])列表?我沒有找到解決方案。 – Seb 2015-02-10 09:52:06

+0

我認爲這是一個類似的話題,然後:http://stackoverflow.com/questions/28166190/spark-column-wise-word-count/28199302#28199302我不知道你想如何使'矢量'從'Int's。但是,如果您想爲每個密鑰獲取一個RDD,則需要拆分原始的RDD,並在鏈接的答案中對此進行了討論。如果它沒有給你答案,我建議提出另一個問題,或許是對你想要做的事情有一個清晰的,高層次的解釋。 – 2015-02-10 12:36:19