2017-01-09 110 views
3

我的Kafka主題包含由deviceId鍵入的狀態。我想使用KStreamBuilder.stream().groupByKey().aggregate(...)僅保留TimeWindow中的最新狀態值。我想的是,只要題目由鍵分區,則聚合功能可以隨時以這種方式返回的最新值:卡夫卡流聚合是否有任何訂購保證?

(key, value, older_value) -> value

這是一個保證,我可以從卡夫卡流期望?我應該推出自己的檢查時間戳的處理方法嗎?

回答

4

卡夫卡流保證通過偏移但不時間戳排序。因此,默認情況下,「上次更新獲勝」策略基於偏移量,但不以時間戳爲基礎。遲到的記錄(在時間戳上定義的「遲到」)是基於時間戳的亂序,並且它們不會被重新排序以保持原始偏移量的順序。

如果你想讓你的窗口包含基於時間戳的最新值,你將需要使用Processor API(PAPI)來完成這項工作。

在Kafka Streams的DSL中,您無法訪問獲取正確結果所需的記錄時間戳。一個簡單的方法可能是在.groupBy()之前加上.transform(),並將時間戳添加到記錄(即其值)本身。因此,您可以使用Aggregator中的時間戳(順便說一句:.reduce()也可以,而不是.aggregate())。最後,您需要在.aggregate()之後執行.mapValues()以再次從該值中刪除時間戳。

使用DSL和PAPI的這種混合匹配方法應該可以簡化您的代碼,因爲您可以使用DSL窗口支持和KTable,並且不需要進行低級時間窗口和狀態管理。

當然,你也可以在一個單一的低級有狀態處理器中完成所有這些,但我不會推薦它。

+1

如果我理解的很好,這意味着排序不保證;) – Steve

+2

我更新了我的問題。有抵消的訂單保證基礎。 –