2017-10-19 133 views
0

我試圖過濾出在給定(跳躍)時間窗口中長度爲T的密鑰出現頻率高於閾值N的任何消息。Kafka Streams - 過濾在時間窗口中頻繁出現的消息

例如,下面的流中:

#time, key 
0, A 
1, B 
2, A 
3, C 
4, D 
5, A 
6, B 
7, C 
8, C 
9, D 
10, A 
11, D 
12, D 
13, D 
14, D 
15, D 

N=2T=3,結果應該是

0, A 
2, A 
7, C 
8, C 
9, D 
11, D 
12, D 
13, D 
14, D 
15, D 

可替代地,如果上述是不可能的,簡化是隻在滿足閾值後過濾消息:

#time, key 
2, A 
8, C 
11, D 
12, D 
13, D 
14, D 
15, D 

Kafka Streams可能嗎?

到目前爲止,我已經嘗試創建流的windowed countKTable的實例)並將其連接回原始流。我使用KTable#toStream((k,v) -> k.key())windowed count的密鑰更改回原始密鑰,並將dummy aggregation更改回KTable的實例。這似乎會引起延遲,導致leftJoin錯過超過閾值後非常接近的消息。

final Serde<String> stringSerde = Serdes.String(); 
    final Serde<Long> longSerde = Serdes.Long(); 

    KStream<String, Long> wcount = source.groupByKey() 
      .count(TimeWindows.of(TimeUnit.SECONDS.toMillis(5)),"Counts") 
      .toStream((k,v) -> k.key()); 

    // perform dummy aggregation to get KTable 
    KTable<String, Long> wcountTable = wcount.groupByKey(stringSerde, longSerde) 
       .reduce((aggValue, newValue) -> newValue, 
       "dummy-aggregation-store"); 

    // left join and filter with threshold N=1 
    source.leftJoin(wcountTable, (leftValue, rightValue) -> rightValue,stringSerde, stringSerde) 
      .filter((k,v) -> v!=null) 
      .filter((k,v) -> v>1) 
      .print("output"); 

我還試圖執行KStream - KStream加入與適當的窗口(離開了虛設聚合):

source.join(wcount, (leftValue, rightValue) -> rightValue, JoinWindows.of(TimeUnit.SECONDS.toMillis(5)),stringSerde, stringSerde, longSerde) 
      .filter((k,v) -> v!=null) 
      .filter((k,v) -> v>1) 
      .print("output"); 

因爲每個UPSERTwcount這導致重複的輸出觸發事件。

回答

1

這當然是可能的。您可以應用一個窗口聚合來收集列表中的所有原始數據(即,您手動實現窗口)。之後,您將應用一個評估窗口的flatMap。如果閾值尚未達到,則不發射任何信號。如果第一次達到閾值,則會發出所有緩衝數據。對於所有進一步調用flatMap的計數大於閾值的情況,您只需發出列表中最新的一個(您知道您之前已發出所有其他調用flatMap的函數,即僅發出新添加的函數)。

注意:您需要禁用KTable緩存,即設置config參數「cache.max.bytes.buffering」= 0。否則,算法將無法正常工作。

事情是這樣的:

KStream<Windowed<K>, List<V>> windows = stream.groupByKey() 
               .aggregate(
               /*init with empty list*/, 
               /*add value to list in agg*/, 
               TimeWindows.of()...), 
               ...) 
               .toStream(); 
KStream<K,V> thresholdMetStream = windows.flatMap(
              /* if List#size < threshold 
               then return empty-list, ie, nothing 
               elseif List#size == threshold 
               then return whole list 
               else [List#size > threshold] 
               then return last element from list 
              */); 
+1

謝謝,我得到了這個工作(幾乎)根據需要。對於[翻滾窗口](https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_dsl_windowing),除了在新時間窗口開始時的事件(當相關事件在前面的窗口)。對於跳頻窗口,由於多個窗口可以包含單個觸發事件序列,因此輸出流中會有重複。所以我選擇使用Processor API和窗口化持久存儲來實現解決方案。 –

相關問題