1

我想知道Spark將在哪些情況下作爲UDAF函數的一部分執行合併。什麼時候合併發生在用戶定義的聚合函數中Spark中的UDAF

動機: 我在Spark項目的窗口中使用了很多UDAF函數。我經常想回答這樣一個問題:

信用卡交易在同一個國家與30天窗口中的當前交易進行了多少次?

該窗口將從當前事務開始,但不會將其包括在計數中。它需要通過當前交易的價值來了解過去30天內哪個國家/地區的數量。

val rollingWindow = Window 
     .partitionBy(partitionByColumn) 
     .orderBy(orderByColumn.desc) 
     .rangeBetween(0, windowSize) 

df.withColumn(
    outputColumnName, 
    customUDAF(inputColumn, orderByColumn).over(rollingWindow)) 

我寫了我的customUDAF來做計數。我總是使用.orderBy(orderByColumn.desc)並感謝.desc當前交易在計算期間在窗口中首先顯示。

UDAF函數需要實現merge函數,該函數在並行計算中合併兩個中間聚合緩衝區。如果發生任何合併,我的current transaction對於不同的緩衝區可能會不同,並且UDAF的結果將不正確。

我寫了一個UDAF函數來計算我的數據集合並的數量,並且只保留窗口中的第一個事務與當前事務進行比較。

class FirstUDAF() extends UserDefinedAggregateFunction { 

    def inputSchema = new StructType().add("x", StringType) 
    .add("y", StringType) 

    def bufferSchema = new StructType() 
    .add("first", StringType) 
    .add("numMerge", IntegerType) 

    def dataType = new StructType() 
    .add("firstCode", StringType) 
    .add("numMerge", IntegerType) 

    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = { 
    buffer(0) = "" 
    buffer(1) = 1 
    } 

    def update(buffer: MutableAggregationBuffer, input: Row): Unit = { 
    if (buffer.getString(0) == "") 
     buffer(0) = input.getString(0) 

    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { 
    buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1) 
    } 

    def evaluate(buffer: Row) = buffer 
} 

當我在本地主用16個CPU與火花2.0.1運行它,也有從未在窗口中的任何兼併和第一筆交易始終是當前事務。這就是我要的。在不久的將來,我將在x100更大的數據集和真正的分佈式Spark羣集上運行我的代碼,並想知道是否可以在那裏發生合併。

問題:

  • 在何種情況下/ conditons兼併發生在UDAF?
  • 使用orderBy執行Windows有沒有兼併?
  • 可以告訴Spark不要做兼併嗎?

回答

1

在UDAF中發生哪些情況/條件合併?

merge在聚合函數(「map side aggregation」)的部分應用程序在shuffle(「reduce side aggregation」)之後合併時調用。

使用orderBy執行Windows有沒有兼併?

當前執行從來沒有。至於窗口函數只是看中了groupByKey,並沒有部分聚合。這當然是實施細節,將來可能會在沒有進一步通知的情況下進行更改。

是否有可能告訴Spark不做兼併?

不是。但是,如果數據已通過聚合密鑰分區,則不需要merge,只使用combine

最後:

多少次信用卡交易是在同一個國家提出在30天的窗口中的當前交易?

不要求UDAFs或窗口函數。我可能會用o.a.s.sql.functions.window創建滾動窗口,按用戶,國家和窗口進行彙總,然後返回輸入。

+0

謝謝您的澄清。我接受你的答案。對於你的最後一點,我不知道我明白我該怎麼做。你能否詳細說明一下?你如何通過窗口聚合?我按用戶進行分區,按日期排序並計算窗口中發生的當前事務的國家(當前相對於窗口,例如sql中的current_row)的次數。對於每個交易這個國家是不同的。 –

相關問題