我希望通過窗口期執行結構化流式聚合。鑑於以下數據模式。目標是根據用戶最近發生的事件進行過濾。然後彙總每個位置的每種事件類型的計數。Spark結構化流式傳輸 - 如何通過最新和聚合計數進行重複數據刪除
time location user type
1 A 1 one
2 A 1 two
1 B 2 one
2 B 2 one
1 A 3 two
1 A 4 one
輸出示例:
類似如下:
val aggTypes = df
.select($"location", $"time", $"user", $"type")
.groupBy($"user")
.agg(max($"timestamp") as 'timestamp)
.select("*")
.withWatermark("timestamp", conf.kafka.watermark.toString + " seconds")
.groupBy(functions.window($"timestamp", DataConstant.t15min.toString + " seconds", DataConstant.t1min.toString + " seconds", $"location")
.agg(count(when($"type" === "one", $"type")) as 'countOne, count(when($"type" === "two", $"type" as 'countTwo)))
.drop($"window")
結構化數據流不支持多聚合,並且不支持流DataFrames非基於時間窗/數據集。我不確定是否有可能在1個流式查詢中實現所需的輸出。
任何幫助表示讚賞。