2016-04-28 84 views
0

我想要做的火花排名如下:apache spark scala中的排序和排名?

輸入:

5.6 
5.6 
5.6 
6.2 
8.1 
5.5 
5.5 

隊伍:

1 
1 
1 
2 
3 
0 
0 
0 

輸出:

Rank Input 
0  5.5 
0  5.5 
1  5.6 
1  5.6 
1  5.6 
2  6.2 
3  8.1 

我想知道我可以如何在火花中排序並獲得與上面列出的相同的排名。的要求是:

  1. 排名爲0而不是1
  2. 開始,這是數以百萬計的記錄和一個分區的樣品的情況下可以非常大 - 我很欣賞上建議如何使用內部排序方法
  3. 排名

我想在scala中做到這一點。有人可以幫我寫代碼嗎?

+0

你預計會有多少個不同的分數?數千,數百萬? –

+0

等級是否也被傳入?或者您是否期望等級是從輸入類型派生的?有關將等級應用到索引的評論使我不清楚 – brycemcd

+0

@AlbertoBonsanto,所以有多個案例,排名全部或僅排名前10或20位。我需要支持所有案例。所以答案都是數百萬。 – happybayes

回答

2

如果你想到剛一些行列,你能先取得所有distinct值,收集他們的List並將其轉變成一個BroadCast。下面,我將展示一個骯髒的例子,請注意,它不能保證輸出將被排序(可能有可能是更好的方法,但是這是我腦海的第一件事):

// Case 1. k is small (fits in the driver and nodes) 
val rdd = sc.parallelize(List(1,1,44,4,1,33,44,1,2)) 
val distincts = rdd.distinct.collect.sortBy(x => x) 
val broadcast = sc.broadcast(distincts) 

val sdd = rdd.map{ 
    case i: Int => (broadcast.value.asInstanceOf[Array[Int]].indexOf(i), i) 
} 

sdd.collect() 

// Array[(Int, Int)] = Array((0,1), (0,1), (4,44), (2,4), (0,1), (3,33), (4,44), (0,1), (1,2)) 

在第二種方法我使用Spark的功能進行排序,在RDD's documentation中可以找到zipWithIndexkeyBy的工作方式。

//case 2. k is big, distinct values don't fit in the Driver. 
val rdd = sc.parallelize(List(1,1,44,4,1,33,44,1,2)) 
val distincts = rdd.distinct.sortBy(x => x).zipWithIndex 
rdd.keyBy(x => x) 
    .join(distincts.keyBy(_._1)) 
    .map{ 
    case (value: Int, (v1: Int, (v2: Int, index: Long))) => (index, value) 
    }.collect() 

//res15: Array[(Long, Int)] = Array((3,33), (2,4), (0,1), (0,1), (0,1), (0,1), (4,44), (4,44), (1,2)) 

順便說一句,我用collect只是爲了可視化的目的,在實際的應用程序,你不應該使用它,除非你確定它適合於驅動程序的內存。

+0

非常感謝你。它提供了預期的結果。由於我更關心性能,所以我想了解內部結構如何進行排序。在這種情況下,如果一個密鑰有100k條記錄,分區將是巨大的,所以想知道sortby是使用庫的唯一選擇或任何建議。我在使用Numpy的python中使用了相同的功能,並且排序非常好。看起來類似。 – happybayes

+0

問題是,如果k很大,spark如何執行排序的方式是在分區間移動多個值,這實際上效率很低;但我會將其添加爲案例2. –

+0

非常感謝,幫助我學習這一點。非常感謝它。 – happybayes