1

我希望通過窗口期執行結構化流式聚合。鑑於以下數據模式。目標是根據用戶最近發生的事件進行過濾。然後彙總每個位置的每種事件類型的計數。Spark結構化流式傳輸 - 如何通過最新和聚合計數進行重複數據刪除

time location user type 
1  A   1  one 
2  A   1  two 
1  B   2  one 
2  B   2  one 
1  A   3  two 
1  A   4  one 

輸出示例:

​​

類似如下:

val aggTypes = df 
    .select($"location", $"time", $"user", $"type") 
    .groupBy($"user") 
    .agg(max($"timestamp") as 'timestamp) 
    .select("*") 
    .withWatermark("timestamp", conf.kafka.watermark.toString + " seconds") 
    .groupBy(functions.window($"timestamp", DataConstant.t15min.toString + " seconds", DataConstant.t1min.toString + " seconds", $"location") 
    .agg(count(when($"type" === "one", $"type")) as 'countOne, count(when($"type" === "two", $"type" as 'countTwo))) 
    .drop($"window") 

結構化數據流不支持多聚合,並且不支持流DataFrames非基於時間窗/數據集。我不確定是否有可能在1個流式查詢中實現所需的輸出。

任何幫助表示讚賞。

回答

0

好像你正在嘗試做無狀態聚合。 https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/KeyValueGroupedDataset.html#flatMapGroups(org.apache.spark.api.java.function.FlatMapGroupsFunction,%20org.apache.spark.sql.Encoder)

flatMapGroups是一個聚合API,它將一個函數應用於數據集中的每個組。它僅適用於分組數據集.flatMapGroups不支持增加混洗開銷的部分聚合。因此,請僅使用此API來執行適合內存的小批量聚合。還建議使用reduce函數或聚合器。 https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/expressions/Aggregator.html

val count = words.groupByKey(x => x) 
      .flatMapGroups 
      { 
       case (x, iterator) ⇒ Iterator((x, iterator.length)) 
       }.toDF("x", "count")   


count.writeStream.format("console").outputMode(OutputMode.Append()) 
相關問題