2017-09-14 72 views
0

下面是我在做什麼:爲什麼repartitionAndSortWithinPartitions不能排序?

val rddkv = sc.parallelize(List(("k1",1),("k2",2),("k1",2),("k3",5),("k3",1))) 
    //rddkv.collect 
    //Array[(String, Int)] = Array((k1,1), (k2,2), (k1,2), (k3,5), (k3,1)) 

rddkv.repartitionAndSortWithinPartitions(new org.apache.spark.RangePartitioner(3,rddkv)).mapPartitionsWithIndex((i,iter_p) => iter_p.map(x=>" index="+i+" value="+x)).collect 
    //Array[String] = Array(" index=0 value=(k1,1)", " index=0 value=(k1,2)", " index=1 value=(k2,2)", " index=1 value=(k3,5)", " index=1 value=(k3,1)") 

注意分區中的值進行排序。這是爲什麼?我錯過了什麼?

回答

1

RDD實際上是排序的,您可能誤解了方法OrderedRDDFunctions.repartitionAndSortWithinPartitions的工作原理。該方法對鍵值對(K,V)的RDD進行操作,其中K是關鍵,V是值。它將重新分區,然後通過密鑰對數據進行排序。

看你的輸出順序:(k1,1), (k1,2), (k2,2), (k3,5), (k3,1),它被正確排序後的關鍵。

如果你只是想排序值,忽略它們在哪個分區,你可以簡單地做rdd.sortBy(_._2)