2017-09-23 87 views
0

的特定領域的排名:火花產生於我已經有了一個<code>rdd</code>作爲計算研究結果讓我們說這是因爲以下格式RDD

(uid, factor, name, avatar, gender, otherFactor1, otherFactor2) 

,現在我想的RDD被factor進行排序,並進行現場像rank指示記錄的軍銜,後來使用foreach每一個記錄寫入到數據庫

我知道我被強權此:

rdd.sortBy{ 
    case (uid, factor, name, avatar, gender, otherFactor1, otherFactor2) => { 
     factor 
    } 
}.foreach{ 
    //how could I insert a rank field by the index of the loop? 
} 

在這裏,我對如何通過foreach循環的指數

任何想法添加rank場卡住?

+2

'rdd.sortBy(_._ 2).zipWithIndex'? – philantrovert

+0

@philantrovert,回答下面的問題:) –

+0

@RameshMaharjan完成。 OP現在可以關閉該問題。 – philantrovert

回答

2

正如評論所說,你可以使用

rdd.sortBy(_._2).zipWithIndex 

你可以用它展平到一個更體面的結構:

rdd.sortBy(_._2).zipWithIndex.map { 
    case ((uid, factor, name, avatar, gender, otherFactor1, otherFactor2), rank) => 
    (uid, factor, name, avatar, gender, otherFactor1, otherFactor2, rank) 
} 

有一件事你可能要注意有關zipWithIndex,從the source code for RDD.scala

此方法需要在此RDD連續時觸發點火作業超過一個分區。

如果你想避免這種情況,你可以使用zipWithUniqueId但我不認爲它給每個元素的連續索引。

0

看看下面是否有幫助。

case class ItemInfo(item:String, quantity:Int) 
val data = sc.parallelize(List(("a",10),("b",20),("c",30))) 
val ItemDF = data.map(x=> ItemInfo(x._1,x._2)).toDF() 
ItemDF.registerTempTable("Item_tbl") 
val rankedItems = sqlContext.sql("select item, quantity, rank() over(order by quantity desc) as rank from Item_tbl") 
rankedItems.collect().foreach(println) 

本示例根據數量對項目進行排序。

+0

抱歉,還沒有嘗試使用spark sql的配置單元方式,'zipWithIndex'被@philantrovert評論會做的伎倆 – armnotstrong

相關問題