2017-03-19 15 views
2

我有以下DataSet,結構如下。如何在Spark數據集中解壓多個鍵

case class Person(age: Int, gender: String, salary: Double) 

我想通過genderage確定平均薪水,從而予組DS通過這兩個鍵。我遇到了兩個主要問題,一個是兩個鍵都混在一個鍵中,但我想將它們保存在兩個不同的列中,另一個是aggregated列的名稱很長,我無法弄清楚如何重命名它(顯然asalias不會工作)所有這一切使用DS API

val df = sc.parallelize(List(Person(100000.00, "male", 27), 
    Person(120000.00, "male", 27), 
    Person(95000, "male", 26), 
    Person(89000, "female", 31), 
    Person(250000, "female", 51), 
    Person(120000, "female", 51) 
)).toDF.as[Person] 

df.groupByKey(p => (p.gender, p.age)).agg(typed.avg(_.salary)).show() 

+-----------+------------------------------------------------------------------------------------------------+ 
|  key| TypedAverage(line2503618a50834b67a4b132d1b8d2310b12.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Person)|   
+-----------+------------------------------------------------------------------------------------------------+ 
|[female,31]| 89000.0... 
|[female,51]| 185000.0... 
| [male,27]| 110000.0... 
| [male,26]| 95000.0... 
+-----------+------------------------------------------------------------------------------------------------+ 

回答

2

別名是一個無類型的動作,所以你必須重新鍵入它。並解壓關鍵的唯一途徑是後做了,通過選擇或東西:

df.groupByKey(p => (p.gender, p.age)) 
    .agg(typed.avg[Person](_.salary).as("average_salary").as[Double]) 
    .select($"key._1",$"key._2",$"average_salary").show 
0

實現這兩個目標的最簡單方法是map()從聚合結果再次Person實例:

.map{case ((gender, age), salary) => Person(gender, age, salary)} 

結果看起來最好的,如果稍微重新安排的情況下參數的順序class'es構造:

case class Person(gender: String, age: Int, salary: Double) 
+------+---+--------+ 
|gender|age| salary| 
+------+---+--------+ 
|female| 31| 89000.0| 
|female| 51|185000.0| 
| male| 27|110000.0| 
| male| 26| 95000.0| 
+------+---+--------+ 

Full code:

import session.implicits._ 
val df = session.sparkContext.parallelize(List(
    Person("male", 27, 100000), 
    Person("male", 27, 120000), 
    Person("male", 26, 95000), 
    Person("female", 31, 89000), 
    Person("female", 51, 250000), 
    Person("female", 51, 120000) 
)).toDS 

import org.apache.spark.sql.expressions.scalalang.typed 
df.groupByKey(p => (p.gender, p.age)) 
    .agg(typed.avg(_.salary)) 
    .map{case ((gender, age), salary) => Person(gender, age, salary)} 
    .show() 
相關問題