2015-10-05 101 views
1

如何基於火花階現有列中添加新列

我做打造阿帕奇火花使用Mllib ALS的建議,與輸出

user | product | rating 
    1 | 20 | 0.002 
    1 | 30 | 0.001 
    1 | 10 | 0.003 
    2 | 20 | 0.002 
    2 | 30 | 0.001 
    2 | 10 | 0.003 

,但我需要改變的數據結構基礎按類別排序,如下:

user | product | rating | number_rangking 
    1 | 10 | 0.003 | 1 
    1 | 20 | 0.002 | 2 
    1 | 30 | 0.001 | 3 
    2 | 10 | 0.002 | 1 
    2 | 20 | 0.001 | 2 
    2 | 30 | 0.003 | 3 

我該怎麼做?也許任何一個都可以給我一個線索...

THX

回答

1

所有你需要的是取決於細節的窗口功能,您選擇使用rankrowNumber

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.rank 

val w = Window.partitionBy($"user").orderBy($"rating".desc) 

df.select($"*", rank.over(w).alias("number_rangking")).show 
// +----+-------+------+---------------+ 
// |user|product|rating|number_rangking| 
// +----+-------+------+---------------+ 
// | 1|  10| 0.003|    1| 
// | 1|  20| 0.002|    2| 
// | 1|  30| 0.001|    3| 
// | 2|  10| 0.003|    1| 
// | 2|  20| 0.002|    2| 
// | 2|  30| 0.001|    3| 
// +----+-------+------+---------------+ 

使用普通RDD可以groupByKey,本地處理和flatMap

rdd 
    // Convert to PairRDD 
    .map{case (user, product, rating) => (user, (product, rating))} 
    .groupByKey 
    .flatMap{case (user, vals) => vals.toArray 
    .sortBy(-_._2) // Sort by rating 
    .zipWithIndex // Add index 
    // Yield final values 
    .map{case ((product, rating), idx) => (user, product, rating, idx + 1)}}