如果你想到剛一些行列,你能先取得所有distinct
值,收集他們的List
並將其轉變成一個BroadCast
。下面,我將展示一個骯髒的例子,請注意,它不能保證輸出將被排序(可能有可能是更好的方法,但是這是我腦海的第一件事):
// Case 1. k is small (fits in the driver and nodes)
val rdd = sc.parallelize(List(1,1,44,4,1,33,44,1,2))
val distincts = rdd.distinct.collect.sortBy(x => x)
val broadcast = sc.broadcast(distincts)
val sdd = rdd.map{
case i: Int => (broadcast.value.asInstanceOf[Array[Int]].indexOf(i), i)
}
sdd.collect()
// Array[(Int, Int)] = Array((0,1), (0,1), (4,44), (2,4), (0,1), (3,33), (4,44), (0,1), (1,2))
在第二種方法我使用Spark的功能進行排序,在RDD's documentation中可以找到zipWithIndex
和keyBy
的工作方式。
//case 2. k is big, distinct values don't fit in the Driver.
val rdd = sc.parallelize(List(1,1,44,4,1,33,44,1,2))
val distincts = rdd.distinct.sortBy(x => x).zipWithIndex
rdd.keyBy(x => x)
.join(distincts.keyBy(_._1))
.map{
case (value: Int, (v1: Int, (v2: Int, index: Long))) => (index, value)
}.collect()
//res15: Array[(Long, Int)] = Array((3,33), (2,4), (0,1), (0,1), (0,1), (0,1), (4,44), (4,44), (1,2))
順便說一句,我用collect
只是爲了可視化的目的,在實際的應用程序,你不應該使用它,除非你確定它適合於驅動程序的內存。
你預計會有多少個不同的分數?數千,數百萬? –
等級是否也被傳入?或者您是否期望等級是從輸入類型派生的?有關將等級應用到索引的評論使我不清楚 – brycemcd
@AlbertoBonsanto,所以有多個案例,排名全部或僅排名前10或20位。我需要支持所有案例。所以答案都是數百萬。 – happybayes