您也可以創建自定義聚合函數並使用它在一個窗口函數中。
像這樣(寫自由所以有可能會出現一些錯誤):
package com.myuadfs
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
class MyUDAF() extends UserDefinedAggregateFunction {
def inputSchema: StructType = StructType(Array(StructField("Return", DoubleType)))
def bufferSchema = StructType(StructField("compounded", DoubleType))
def dataType: DataType = DoubleType
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer(0) = 1.0 // set compounded to 1
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
buffer(0) = buffer.getDouble(0) * (input.getDouble(0) + 1)
}
// this generally merges two aggregated buffers. This means this
// would not have worked properly had you been working with a regular
// aggregate but since you are planning to use this inside a window
// only this should not be called at all.
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
}
def evaluate(buffer: Row) = {
buffer.getDouble(0)
}
}
現在,你可以在一個窗口函數內使用它。事情是這樣的:
import org.apache.spark.sql.Window
val windowSpec = Window.orderBy("date")
val newDF = df.withColumn("compounded", df("Return").over(windowSpec)
注意這整個計算應裝在單個分區,所以如果你有太大的數據,你會遇到問題的限制。也就是說,名義上這種操作是在按鍵分區後執行的(例如,將分區添加到窗口中),然後單個元素應該是鍵的一部分。
你的樣品是OK?在第二行中,Compounded的值不應該是0.98? –
是。它應該是0.98。對不起,錯誤 –