2017-06-07 31 views
1

我必須使用RDD這樣做的請求:如何使用數據集GROUPBY

val test = Seq(("New York", "Jack"), 
    ("Los Angeles", "Tom"), 
    ("Chicago", "David"), 
    ("Houston", "John"), 
    ("Detroit", "Michael"), 
    ("Chicago", "Andrew"), 
    ("Detroit", "Peter"), 
    ("Detroit", "George") 
) 
sc.parallelize(test).groupByKey().mapValues(_.toList).foreach(println) 

結果是:

(紐約,列表(傑克))

(底特律,列表(邁克爾·彼得,喬治))

(洛杉磯,列表(湯姆))

(休斯頓,列表(約翰))

(芝加哥,列表(大衛,安德魯))

如何做到這一點使用數據集spark2.0?

我有辦法使用自定義功能,但感覺是如此複雜,有沒有簡單一點的方法

回答

1

我建議你開始創建一個case class作爲

case class Monkey(city: String, firstName: String) 

case class應的主類之外定義。然後,你可以只使用toDS功能和使用groupByaggregation函數調用collect_list如下

import sqlContext.implicits._ 
import org.apache.spark.sql.functions._ 

val test = Seq(("New York", "Jack"), 
    ("Los Angeles", "Tom"), 
    ("Chicago", "David"), 
    ("Houston", "John"), 
    ("Detroit", "Michael"), 
    ("Chicago", "Andrew"), 
    ("Detroit", "Peter"), 
    ("Detroit", "George") 
) 
sc.parallelize(test).map(row => Monkey(row._1, row._2)).toDS().groupBy("city").agg(collect_list("firstName") as "list").show(false) 

您將有輸出

+-----------+------------------------+ 
|city  |list     | 
+-----------+------------------------+ 
|Los Angeles|[Tom]     | 
|Detroit |[Michael, Peter, George]| 
|Chicago |[David, Andrew]   | 
|Houston |[John]     | 
|New York |[Jack]     | 
+-----------+------------------------+ 

您可以隨時打電話只是.rdd功能

轉換回 RDD
0

首先,我會變成你的RDD到DataSet:

val spark: org.apache.spark.sql.SparkSession = ??? 
import spark.implicits._ 

val testDs = test.toDS() 

這裏你得到你的col名:)使用它明智!

testDs.schema.fields.foreach(x => println(x)) 

在最後你只需要使用GROUPBY:

testDs.groupBy("City?", "Name?") 

RDD-S是不是真正的2.0版本的方式,我認爲。 如果您有任何問題,請隨時詢問。

+1

'testDs.columns'甚至可以更快地得到沒有類型的列名(作爲'Array [String]')。 – Garren

+0

好點!真 –

0

要創建數據集,請首先在類別外定義一個案例類作爲

case class Employee(city: String, name: String) 

然後您可以將列表轉換爲數據集作爲

val spark = 
    SparkSession.builder().master("local").appName("test").getOrCreate() 
    import spark.implicits._ 
    val test = Seq(("New York", "Jack"), 
    ("Los Angeles", "Tom"), 
    ("Chicago", "David"), 
    ("Houston", "John"), 
    ("Detroit", "Michael"), 
    ("Chicago", "Andrew"), 
    ("Detroit", "Peter"), 
    ("Detroit", "George") 
    ).toDF("city", "name") 
    val data = test.as[Employee] 

或者

import spark.implicits._ 
    val test = Seq(("New York", "Jack"), 
     ("Los Angeles", "Tom"), 
     ("Chicago", "David"), 
     ("Houston", "John"), 
     ("Detroit", "Michael"), 
     ("Chicago", "Andrew"), 
     ("Detroit", "Peter"), 
     ("Detroit", "George") 
    ) 

    val data = test.map(r => Employee(r._1, r._2)).toDS() 

現在,您可以groupby和執行任何聚合爲

data.groupBy("city").count().show 

data.groupBy("city").agg(collect_list("name")).show 

希望這有助於!