2016-03-10 57 views
0

後我有一個RDD P映射到類:火花:TOPN的GroupBy

case class MyRating(userId:Int, itemId:Int, rating:Double) 

我對尋找爲每個用戶即的GroupBy 用戶id並且每個形成的組內TOPN條目,濾出TopN(例如10)條目基於最高評分

我做了以下內容:

val A : RDD[((Int), Iterable[MyRating])] = P.keyBy(r => (r.userId)).groupByKey 
val B : RDD[((Int), List[MyRating])] = key.mapValues(iter => iter.toList.sortBy(_.rating, false)) 
val C = values.groupByKey.take(10) 

顯然應用。取(10)groupByKey離開後,只有10個鍵(用戶),我並不會篩選出每個用戶的TOP10評級。

我們如何去應用.take(N)後面的groupBy,以便它作用於某個部分的值而不是key本身?

回答

3

天真的方法是採取ñ值:

B.mapValues(_.take(n)) 

但如果你需要的值只有小部分會更好地使用例如aggregateByKey和運行,而不是分組丟棄過時的記錄一切。你可能希望要的東西在實踐中更有效的(你可以檢查top/takeOrdered星火實現),但你可以像這樣開始:

import scala.math.Ordering 
import scala.collection.mutable.PriorityQueue 

implicit val ord = Ordering.by[MyRating, Double](_.rating) 

val pairs = rdd.keyBy(_.userId) 
pairs.aggregateByKey(new scala.collection.mutable.PriorityQueue[MyRating]())(
    (acc, x) => { 
    acc.enqueue(x) 
    acc.take(n) 
    }, 
    (acc1, acc2) => (acc1 ++ acc2).take(n) 
) 

注意,上面的代碼需要斯卡拉2.11+由於SI-7568

+0

謝謝,aggregateByKey更有意義..看起來像一個強大的構造。 – srbhkmr

2

如果我理解正確的話,你需要做的是: 組RDD通過用戶ID,然後爲每個(ID,列表)元組給予回覆的ID和排序,並修剪成10個元素的列表

P 
    .groupBy(_.userId) 
    .map{ case (key, it) => 
    (key, it.toList.sortBy(mr => -mr.rating).take(10)) 
    } 
1

您非常接近,但您需要在A到B的映射範圍內取前N個條目。例如,如果您想從列表中取得前2個MyRating項目,則下面的代碼將執行招。 B將是一個RDD,其中包含每個userId的前兩名MyRating的列表。 (此外,sortBy函數將簡單地通過使評級爲負)工作。

case class MyRating(userId:Int, itemId:Int, rating:Double) 

val plist:List[MyRating] = List(MyRating(1,0,1),MyRating(1,1,5),MyRating(1,2,7),MyRating(1,3,9),MyRating(1,4,10),MyRating(2,5,1),MyRating(2,6,5),MyRating(2,6,7)) 
val P: org.apache.spark.rdd.RDD[MyRating] = sc.parallelize(plist) 

val A : RDD[((Int), Iterable[MyRating])] = P.keyBy(r => (r.userId)).groupByKey 
val TOPCOUNT = 2 
val B : RDD[((Int), List[MyRating])] = A.mapValues(iter => iter.toList.sortBy(- _.rating).take(TOPCOUNT)) 
1

下面是使用aggregateByKey通過zero323的建議爲例:

val A : RDD[(Int, MyRating)] = P.keyBy(r => r.userId) 
val B = A.aggregateByKey(List[MyRating]())(
    (l, r) => (l :+ r).sortBy(-_.rating).take(10), 
    (l1, l2) => (l1 ++ l2).sortBy(-_.rating).take(10)) 

使用這種方法的好處是,你沒有可能打亂你的遺囑執行人之間的大量數據。如果來自單個用戶的評級分佈在多個節點上,則groupBy需要將用戶的所有評級發送給同一個執行者,而使用aggregateByKey首先在每個執行者上建立前N列表,然後只有那些列表被混洗並結合起來。

這是否對您有益取決於數據的分佈。如果你的收視率比最終收視率還要高,那麼你並沒有獲得太多的收視率(尤其是對於每個單獨的收視率進行排序的我的天真實施)。但是,如果每個執行者的評分數量大一個數量級,您就可以贏得很多。

+0

感謝您解釋這兩種方法之間的差異。非常感激。 – srbhkmr