1
我已經編寫了一個結構化流式聚合,它從卡夫卡源中獲取事件,執行簡單計數並將它們寫回到卡桑德拉數據庫。代碼如下所示:結構化流式聚合返回錯誤的值
val data = stream
.groupBy(functions.to_date($"timestamp").as("date"), $"type".as("type"))
.agg(functions.count("*").as("value"))
val query: StreamingQuery = data
.writeStream
.queryName("group-by-type")
.format("org.apache.spark.sql.streaming.cassandra.CassandraSinkProvider")
.outputMode(OutputMode.Complete())
.option("checkpointLocation", config.getString("checkpointLocation") + "/" + "group-by-type")
.option("keyspace", "analytics")
.option("table", "aggregations")
.option("partitionKeyColumns", "project,type")
.option("clusteringKeyColumns", "date")
.start()
問題是計數剛剛結束每一批。所以我會看到卡桑德拉的數量下降。計數不應該一天下降,我該如何實現這一目標?
編輯: 我一直在使用的窗口聚集太多,同樣的事情
你試過更新模式嗎? –
是的,我曾嘗試過,同樣的行爲 –