2017-06-16 43 views
0

假設我有類型,如這些:Dataset.groupByKey +無類型聚合函數

case class SomeType(id: String, x: Int, y: Int, payload: String) 
case class Key(x: Int, y: Int) 

然後假設我在Dataset[SomeType]這樣做groupByKey

val input: Dataset[SomeType] = ... 

val grouped: KeyValueGroupedDataset[Key, SomeType] = 
    input.groupByKey(s => Key(s.x, s.y)) 

然後假設我有確定功能我想在聚合中使用哪個字段:

val chooseDistinguisher: SomeType => String = _.id 

A ND現在我想在分組數據集運行聚合的功能,例如,functions.countDistinct,使用由所述函數而獲得的字段:

grouped.agg(
    countDistinct(<something which depends on chooseDistinguisher>).as[Long] 
) 

的問題是,我無法從chooseDistinguisher創建一個UDF,因爲countDistinct接受一個Column,並將一個UDF變成一個Column您需要指定輸入列名,我不能這樣做 - 我不知道KeyValueGroupedDataset的「值」使用哪個名稱。

我覺得應該是有可能的,因爲KeyValueGroupedDataset本身有類似的功能:

def count(): Dataset[(K, Long)] = agg(functions.count("*").as(ExpressionEncoder[Long]())) 

但是,這種方法欺騙了一下,因爲它使用"*"作爲列名,但我需要指定特定列(即鍵值分組數據集中「值」的列)。另外,當您使用typed對象中的鍵入函數時,您也不需要指定列名稱,並且它以某種方式工作。

那麼,有沒有可能做到這一點,如果是這樣,如何做到這一點?

+1

聽起來有點像一個宏的工作,雖然混合'DataFrames'和靜態類型只會給你帶來麻煩。另外,根本不要使用SQL,並使用['Aggregator'](https://stackoverflow.com/a/32101530/1560062)。 – zero323

+0

問題是,我想避免重新實現已經存在的所有聚合函數,所以我不想創建自定義聚合器。至於混合數據框和靜態類型,好吧,在Spark 2.0中,DataFrame只是'Dataset [Row]'的別名,所以我希望只要提供了正確的類型信息,對數據框起作用的所有東西都應該同樣適用於數據集。 –

+0

這就是爲什麼我想到宏。關於你的期望 - 'DataFrame'是'Dataset'的一個特例,並且大部分'DataFrame'優化,通常不適用於'Dataset'。無框提供了一個更理智的API方法,但仍然遠遠優化。 – zero323

回答

0

因爲我知道這是不可能與agg轉型,它預計TypedColumn類型是基於Column類型使用as方法構建的,因此您需要從不是類型安全表達式開始。如果有人知道解決方案,我很想看到它... ...

如果需要使用類型安全的集合,你可以使用以下方法一個:

  • mapGroups - 在這裏你可以實現斯卡拉功能負責對於聚集Iterator
  • 實現自定義Aggregator上述

第一種方法需要更少的代碼的建議,所以下面我展示簡單的例子:

def countDistinct[T](values: Iterator[T])(chooseDistinguisher: T => String): Long = 
    values.map(chooseDistinguisher).toSeq.distinct.size 

ds 
    .groupByKey(s => Key(s.x, s.y)) 
    .mapGroups((k,vs) => (k, countDistinct(vs)(_.name))) 

在我看來,Spark數據集類型安全的API仍然遠不及類型安全的DataFrame API成熟。前一段時間,我認爲爲Spark數據集實現簡單易用的類型安全聚合API是個好主意。

0

目前,該用例最好使用DataFrame來處理,您可以稍後將其轉換回Dataset[A]

// Code assumes SQLContext implicits are present 
import org.apache.spark.sql.{functions => f} 

val colName = "id" 
ds.toDF 
    .withColumn("key", f.concat('x, f.lit(":"), 'y)) 
    .groupBy('key) 
    .agg(countDistinct(f.col(colName)).as("cntd"))