2017-02-14 65 views
0

我使用MLlib評級對象(僅限於(int userId,int productId,雙重評級)的元組)對產品評級進行RDD檢查。我想從RDD中刪除任何評分過低的產品評論。根據出現次數篩選RDD

例如RDD可能是這樣的:

Rating(35, 1, 5.0) 
Rating(18, 1, 4.0) 
Rating(29, 2, 3.0) 
Rating(12, 2, 2.0) 
Rating(65, 3, 1.0) 

,如果我是過濾,以去除任何產品,小於2條的評論,它只是篩選出最後的評級,並給予回到第4位。 (我想用比2更高的最小審閱計數的方式進行過濾,但僅舉個例子)。

目前我有這樣的代碼,其收視率的多少的順序輸出的產品ID的序列,但我不能確定的方式基於從主RDD過濾,似乎低效反正:

val mostRated = ratings.map(_._2.product) 
         .countByValue 
         .toSeq 
         .sortBy(- _._2) 
         .map(_._1) 

回答

0

您可以組RDD通過產品編號,然後過濾它基於如果組的長度大於閾值(這裏1)大。使用flatMap提取從分組的RDD結果:

case class Rating(UserId: Int, ProductId: Int, Rating: Double) 

val ratings = sc.parallelize(Seq(Rating(35, 1, 5.0), 
    Rating(18, 1, 4.0), 
    Rating(29, 2, 3.0), 
    Rating(12, 2, 2.0), 
    Rating(65, 3, 1.0))) 

val prodMinCounts = ratings.groupBy(_.ProductId). 
          filter(_._2.toSeq.length > 1). 
          flatMap(_._2) 
prodMinCounts.collect 
// res14: Array[Rating] = Array(Rating(35,1,5.0), Rating(18,1,4.0), Rating(29,2,3.0), Rating(12,2,2.0))