我試圖過濾出在給定(跳躍)時間窗口中長度爲T
的密鑰出現頻率高於閾值N
的任何消息。Kafka Streams - 過濾在時間窗口中頻繁出現的消息
例如,下面的流中:
#time, key
0, A
1, B
2, A
3, C
4, D
5, A
6, B
7, C
8, C
9, D
10, A
11, D
12, D
13, D
14, D
15, D
和N=2
和T=3
,結果應該是
0, A
2, A
7, C
8, C
9, D
11, D
12, D
13, D
14, D
15, D
可替代地,如果上述是不可能的,簡化是隻在滿足閾值後過濾消息:
#time, key
2, A
8, C
11, D
12, D
13, D
14, D
15, D
Kafka Streams可能嗎?
到目前爲止,我已經嘗試創建流的windowed count
(KTable
的實例)並將其連接回原始流。我使用KTable#toStream((k,v) -> k.key())
將windowed count
的密鑰更改回原始密鑰,並將dummy aggregation更改回KTable
的實例。這似乎會引起延遲,導致leftJoin
錯過超過閾值後非常接近的消息。
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
KStream<String, Long> wcount = source.groupByKey()
.count(TimeWindows.of(TimeUnit.SECONDS.toMillis(5)),"Counts")
.toStream((k,v) -> k.key());
// perform dummy aggregation to get KTable
KTable<String, Long> wcountTable = wcount.groupByKey(stringSerde, longSerde)
.reduce((aggValue, newValue) -> newValue,
"dummy-aggregation-store");
// left join and filter with threshold N=1
source.leftJoin(wcountTable, (leftValue, rightValue) -> rightValue,stringSerde, stringSerde)
.filter((k,v) -> v!=null)
.filter((k,v) -> v>1)
.print("output");
我還試圖執行KStream
- KStream
加入與適當的窗口(離開了虛設聚合):
source.join(wcount, (leftValue, rightValue) -> rightValue, JoinWindows.of(TimeUnit.SECONDS.toMillis(5)),stringSerde, stringSerde, longSerde)
.filter((k,v) -> v!=null)
.filter((k,v) -> v>1)
.print("output");
因爲每個UPSERT
成wcount
這導致重複的輸出觸發事件。
謝謝,我得到了這個工作(幾乎)根據需要。對於[翻滾窗口](https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_dsl_windowing),除了在新時間窗口開始時的事件(當相關事件在前面的窗口)。對於跳頻窗口,由於多個窗口可以包含單個觸發事件序列,因此輸出流中會有重複。所以我選擇使用Processor API和窗口化持久存儲來實現解決方案。 –