2017-10-06 17 views
0

對於用戶在我們的平臺上執行的每個事件,我們都有一個唯一主題的kafka消息。每個事件/ kafka消息都有一個公共字段userId。我們現在想從這個話題知道我們每小時有多少獨特的用戶。所以我們對用戶的事件類型和個人數量不感興趣。我們只想知道每個小時有多少獨特用戶在使用。 實現此目的的最簡單方法是什麼?我目前的想法似乎不是很簡單,看到這裏的僞代碼:如何在卡夫卡流應用中的固定時間窗口中統計唯一用戶?

stream 
.selectKey() // userId 
.groupByKey() // group by userid, results in a KGroupedStream[UserId, Value] 
.aggregate(// initializer, merger und accumulator simply deliver a constant value, the message is now just a tick for that userId key 
    TimeWindows.of(3600000) 
) // result of aggregate is KTable[Windowed[UserId], Const] 
.toStream // convert in stream to be able to map key in next step 
.map() // map key only (Windowed[Userid]) to key = startMs of window to and value Userid 
.groupByKey() // grouping by startMs of windows, which was selected as key before 
.count() // results in a KTable from startMs of window to counts of users (== unique userIds) 

有沒有更簡單的方法?我可能忽略了一些東西

回答

1

有兩兩件事可以做:

  1. 合併selectKey()groupByKey()groupBy()
  2. 你不需要toStream().map()一步,但你可以做一個新的密鑰直接在第一KTable重組

事情是這樣的:

stream.groupBy(/* put a KeyValueMapper that return the grouping key */) 
     .aggregate(... TimeWindow.of(TimeUnit.HOURS.toMillis(1)) 
     .groupBy(/* put a KeyValueMapper that return the new grouping key */) 
     .count()