看着你的要求,UDAF
aggregation
適合最好的。您可以結帳databricks和ragrawal以獲得更好的理解。
我根據你提供指導,我的理解,我希望這是有幫助的
所有你需要定義UDAF
第一。在您成功閱讀上述鏈接後,您就可以做到這一點。
private class ManosAggregateFunction(daysToCheck: Int, countsToCheck: Int) extends UserDefinedAggregateFunction {
var referenceDate: String = _
def inputSchema: StructType = new StructType().add("timestamp", StringType).add("count", IntegerType)
// the aggregation buffer can also have multiple values in general but
// this one just has one: the partial sum
def bufferSchema: StructType = new StructType().add("timestamp", StringType).add("count", IntegerType).add("days", IntegerType)
// returns just a double: the sum
def dataType: DataType = BooleanType
// always gets the same result
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer.update(0, "")
buffer.update(1, 0)
buffer.update(2, 0)
referenceDate = ""
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val nowDate = input.getString(0)
val count = input.getInt(1)
buffer.update(0, nowDate)
buffer.update(1, count)
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
val previousDate = buffer1.getString(0)
val nowDate = buffer2.getString(0)
if(previousDate != "") {
val oldDate = LocalDate.parse(previousDate, formatter)
val newDate = LocalDate.parse(nowDate, formatter)
buffer1.update(2, buffer1.getInt(2)+(oldDate.toEpochDay() - newDate.toEpochDay()).toInt)
}
buffer1.update(0, buffer2.getString(0))
if(buffer1.getInt(2) < daysToCheck) {
buffer1.update(1, buffer1.getInt(1) + buffer2.getInt(1))
}
}
def evaluate(buffer: Row): Any = {
countsToCheck <= buffer.getInt(1)
}
}
在上面UDAF
,daysToCheck
和countsToCheck
是你的問題X
和Y
。
您可以撥打定義UDAF
如下
val manosAgg = new ManosAggregateFunction(5,2)
df.orderBy($"timestamp".desc).groupBy("id").agg(manosAgg(col("timestamp"), col("count")).as("code")).show
最終輸出
+---+-----+
| id| code|
+---+-----+
| 1| true|
| 2|false|
+---+-----+
給定的輸入
val df = Seq(
(1, "2017-06-22", 1),
(1, "2017-06-23", 0),
(1, "2017-06-24", 1),
(2, "2017-06-28", 0),
(2, "2017-06-29", 1)
).toDF("id","timestamp","count")
+---+----------+-----+
|id |timestamp |count|
+---+----------+-----+
|1 |2017-06-22|1 |
|1 |2017-06-23|0 |
|1 |2017-06-24|1 |
|2 |2017-06-28|0 |
|2 |2017-06-29|1 |
+---+----------+-----+
我希望你已經得到了你的問題的想法。 :)