假設我有類型,如這些:Dataset.groupByKey +無類型聚合函數
case class SomeType(id: String, x: Int, y: Int, payload: String)
case class Key(x: Int, y: Int)
然後假設我在Dataset[SomeType]
這樣做groupByKey
:
val input: Dataset[SomeType] = ...
val grouped: KeyValueGroupedDataset[Key, SomeType] =
input.groupByKey(s => Key(s.x, s.y))
然後假設我有確定功能我想在聚合中使用哪個字段:
val chooseDistinguisher: SomeType => String = _.id
A ND現在我想在分組數據集運行聚合的功能,例如,functions.countDistinct
,使用由所述函數而獲得的字段:
grouped.agg(
countDistinct(<something which depends on chooseDistinguisher>).as[Long]
)
的問題是,我無法從chooseDistinguisher
創建一個UDF,因爲countDistinct
接受一個Column
,並將一個UDF變成一個Column
您需要指定輸入列名,我不能這樣做 - 我不知道KeyValueGroupedDataset
的「值」使用哪個名稱。
我覺得應該是有可能的,因爲KeyValueGroupedDataset
本身有類似的功能:
def count(): Dataset[(K, Long)] = agg(functions.count("*").as(ExpressionEncoder[Long]()))
但是,這種方法欺騙了一下,因爲它使用"*"
作爲列名,但我需要指定特定列(即鍵值分組數據集中「值」的列)。另外,當您使用typed
對象中的鍵入函數時,您也不需要指定列名稱,並且它以某種方式工作。
那麼,有沒有可能做到這一點,如果是這樣,如何做到這一點?
聽起來有點像一個宏的工作,雖然混合'DataFrames'和靜態類型只會給你帶來麻煩。另外,根本不要使用SQL,並使用['Aggregator'](https://stackoverflow.com/a/32101530/1560062)。 – zero323
問題是,我想避免重新實現已經存在的所有聚合函數,所以我不想創建自定義聚合器。至於混合數據框和靜態類型,好吧,在Spark 2.0中,DataFrame只是'Dataset [Row]'的別名,所以我希望只要提供了正確的類型信息,對數據框起作用的所有東西都應該同樣適用於數據集。 –
這就是爲什麼我想到宏。關於你的期望 - 'DataFrame'是'Dataset'的一個特例,並且大部分'DataFrame'優化,通常不適用於'Dataset'。無框提供了一個更理智的API方法,但仍然遠遠優化。 – zero323