2017-07-04 49 views
0

我有RDD [(中間體,數組[雙])] 例如:排序RDD根據一個Array()內容

1, Array(2.0,5.0,6.3) 
5, Array(1.0,3.3,9.5) 
1, Array(5.0,4.2,3.1) 
2, Array(9.6,6.3,2.3) 
1, Array(8.5,2.5,1.2) 
5, Array(6.0,2.4,7.8) 
2, Array(7.8,9.1,4.2) 

我必須收集第1列的不同值並安排在整個根據該陣列的RDD。

val label_array = rdd.map(_._1).collect.distinct 

輸出:陣列(1,5,2),我現在有根據label_array到數據安排。

所需的輸出

1, Array(2.0,5.0,6.3) 
1, Array(5.0,4.2,3.1) 
1, Array(8.5,2.5,1.2) 
5, Array(1.0,3.3,9.5) 
5, Array(6.0,2.4,7.8) 
2, Array(9.6,6.3,2.3) 
2, Array(7.8,9.1,4.2) 

我已經試過

val ordering = (1,5,2).productIterator.toList.zipWithIndex.toMap 
rdd.sortBy{case (k,v) => ordering(k)} 

但如何獲得所需的輸出作爲數組將被改變(要素和大小的區別)。我如何根據數組格式對RDD進行排序?

回答

0

只是zipWithIndexlabel_array,你應該罰款

val ordering = label_array.zipWithIndex.map(x => (x._1, x._2)).toMap 

而且你應該有你的ordering地圖

scala.collection.immutable.Map[Int,Int] = Map(1 -> 0, 5 -> 1, 2 -> 2) 
0

更簡單的方法是創建一個新的RDD具有鮮明的第1列,並與加盟以前的專欄

下面是簡單的例子

val rdd = spark.sparkContext.parallelize(Seq(
     (1, Array(2.0,5.0,6.3)), 
     (5, Array(1.0,3.3,9.5)), 
     (1, Array(5.0,4.2,3.1)), 
     (2, Array(9.6,6.3,2.3)), 
     (1, Array(8.5,2.5,1.2)), 
     (5, Array(6.0,2.4,7.8)), 
     (2, Array(7.8,9.1,4.2)) 
    ) 
    ) 

    val distinct = rdd.map(v => (v._1, 1))distinct() 
    //(v._1, 1)this is done because you need key value to join 

    //now join distinct with previous original RDD 
    distinct.join(rdd).map(v => (v._1, v._2._2)) 

輸出:

1, Array(2.0,5.0,6.3) 
1, Array(5.0,4.2,3.1) 
1, Array(8.5,2.5,1.2) 
5, Array(1.0,3.3,9.5) 
5, Array(6.0,2.4,7.8) 
2, Array(9.6,6.3,2.3) 
2, Array(7.8,9.1,4.2)