2016-12-01 65 views
2

我有一個Kafka流,它從一個主題獲取數據,並且需要將這些信息過濾爲兩個不同的主題。Kafka - 如何同時使用過濾器和filternot?

KStream<String, Model> stream = builder.stream(Serdes.String(), specificAvroSerde, "not-filtered-topic"); 
stream.filter((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "good-topic"); 
stream.filterNot((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "bad-topic"); 

然而,當我像這樣做,它會讀取從主題數據的兩倍 - 不知道有作爲數據變大對性能產生任何影響。有沒有辦法只過濾一次,並推到兩個主題?

回答

1

你的做法是正確的,數據是無法讀取的話題兩次並且還有事情沒有內部數據複製。你的方法的唯一缺點是,兩個過濾器謂詞都會針對每條記錄進行評估 - 但是,這非常便宜,不應該是性能問題。

但是,您仍然可以通過使用KStream#branch()來提高性能,該程序確實接受多個謂詞,並相互評估所有謂詞,併爲每個謂詞返回一個輸入流。如果記錄與謂詞相匹配,則將其放入相應的輸出流中,並停止評估(即,不對此單個記錄評估進一步的謂詞 - 這可確保將每個記錄添加到最大一個輸出流;或者如果沒有謂詞匹配)。

因此,您可以只提供branch()的兩個謂詞:第一個謂詞與原始filter()謂詞相同,第二個謂詞總是返回true

KStream<String, Model> stream = builder.stream(
    Serdes.String(), 
    specificAvroSerde, 
    "not-filtered-topic" 
); 
KStream[] splitStreams = stream.branch(
    (key, value) -> new Processor().test(key,value), 
    (key, value) -> true 
); 
splitStreams[0].to(Serdes.String(), specificAvroSerde, "good-topic"); 
splitStreams[1].to(Serdes.String(), specificAvroSerde, "bad-topic"); 

不知道這段代碼是否比您的原始版本更好的可讀性。我想這是一個有趣的問題,我個人更喜歡你的原始代碼,因爲它確實更好地表達了語義。

我添加的版本應該略高於CPU效率,因爲對於所有滿足謂詞的記錄來說,它只被評估一次。對於所有不滿足結果的記錄,將返回一個簡單的true(即,沒有第二個謂詞評估)。

如果您知道,大多數的記錄將在splitStream[1]結束了,你也可以顛倒謂詞(和使用splitStream[0]爲「壞流」),以減少呼叫的數量到第二true -returning謂語。但這些只是微觀優化,並不重要。

相關問題