我想知道Spark將在哪些情況下作爲UDAF函數的一部分執行合併。什麼時候合併發生在用戶定義的聚合函數中Spark中的UDAF
動機: 我在Spark項目的窗口中使用了很多UDAF函數。我經常想回答這樣一個問題:
信用卡交易在同一個國家與30天窗口中的當前交易進行了多少次?
該窗口將從當前事務開始,但不會將其包括在計數中。它需要通過當前交易的價值來了解過去30天內哪個國家/地區的數量。
val rollingWindow = Window
.partitionBy(partitionByColumn)
.orderBy(orderByColumn.desc)
.rangeBetween(0, windowSize)
df.withColumn(
outputColumnName,
customUDAF(inputColumn, orderByColumn).over(rollingWindow))
我寫了我的customUDAF來做計數。我總是使用.orderBy(orderByColumn.desc)
並感謝.desc
當前交易在計算期間在窗口中首先顯示。
UDAF函數需要實現merge
函數,該函數在並行計算中合併兩個中間聚合緩衝區。如果發生任何合併,我的current transaction
對於不同的緩衝區可能會不同,並且UDAF的結果將不正確。
我寫了一個UDAF函數來計算我的數據集合並的數量,並且只保留窗口中的第一個事務與當前事務進行比較。
class FirstUDAF() extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
.add("y", StringType)
def bufferSchema = new StructType()
.add("first", StringType)
.add("numMerge", IntegerType)
def dataType = new StructType()
.add("firstCode", StringType)
.add("numMerge", IntegerType)
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer(0) = ""
buffer(1) = 1
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (buffer.getString(0) == "")
buffer(0) = input.getString(0)
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
}
def evaluate(buffer: Row) = buffer
}
當我在本地主用16個CPU與火花2.0.1運行它,也有從未在窗口中的任何兼併和第一筆交易始終是當前事務。這就是我要的。在不久的將來,我將在x100更大的數據集和真正的分佈式Spark羣集上運行我的代碼,並想知道是否可以在那裏發生合併。
問題:
- 在何種情況下/ conditons兼併發生在UDAF?
- 使用orderBy執行Windows有沒有兼併?
- 可以告訴Spark不要做兼併嗎?
謝謝您的澄清。我接受你的答案。對於你的最後一點,我不知道我明白我該怎麼做。你能否詳細說明一下?你如何通過窗口聚合?我按用戶進行分區,按日期排序並計算窗口中發生的當前事務的國家(當前相對於窗口,例如sql中的current_row)的次數。對於每個交易這個國家是不同的。 –