0
對於用戶在我們的平臺上執行的每個事件,我們都有一個唯一主題的kafka消息。每個事件/ kafka消息都有一個公共字段userId。我們現在想從這個話題知道我們每小時有多少獨特的用戶。所以我們對用戶的事件類型和個人數量不感興趣。我們只想知道每個小時有多少獨特用戶在使用。 實現此目的的最簡單方法是什麼?我目前的想法似乎不是很簡單,看到這裏的僞代碼:如何在卡夫卡流應用中的固定時間窗口中統計唯一用戶?
stream
.selectKey() // userId
.groupByKey() // group by userid, results in a KGroupedStream[UserId, Value]
.aggregate(// initializer, merger und accumulator simply deliver a constant value, the message is now just a tick for that userId key
TimeWindows.of(3600000)
) // result of aggregate is KTable[Windowed[UserId], Const]
.toStream // convert in stream to be able to map key in next step
.map() // map key only (Windowed[Userid]) to key = startMs of window to and value Userid
.groupByKey() // grouping by startMs of windows, which was selected as key before
.count() // results in a KTable from startMs of window to counts of users (== unique userIds)
有沒有更簡單的方法?我可能忽略了一些東西