2016-11-24 121 views
7

我有兩個Kafka主題將來自不同來源的完全相同的內容進行流式傳輸,因此如果其中一個來源發生故障,我可以獲得高可用性。 我試圖將2個主題合併爲1個輸出主題,使用Kafka Streams 0.10.1.0,這樣我就不會錯過關於失敗的任何消息,並且在所有源都啓動時沒有重複。合併多個相同的Kafka Streams主題

當使用KStream的leftJoin方法時,其中一個主題可以順利進行(次要主題),但當主要主題關閉時,不會向輸出主題發送任何內容。這似乎是因爲,根據Kafka Streams developer guide

KStream-KStream leftJoin總是由主氣流到達記錄驅動

因此,如果沒有記錄從主流到來,即使它們存在,也不會使用輔助流中的記錄。主流重新聯機後,輸出恢復正常。

我也使用outerJoin(添加重複記錄),接着轉換到KTable和groupByKey擺脫重複的,

KStream mergedStream = stream1.outerJoin(stream2, 
    (streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1, 
    JoinWindows.of(2000L)) 

mergedStream.groupByKey() 
      .reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore)) 
      .toStream((key,value) -> value) 
      .to(outputStream) 

嘗試,但我還是在一段時間得到重複一次。我還使用commit.interval.ms=200來讓KTable經常發送到輸出流。

什麼是最好的方式來處理這種合併從多個相同的輸入主題完全一次輸出?

+0

一般來說,我會推薦Processor API來解決這個問題。您也可以嘗試切換到當前的「中繼」版本(不確定這是否適用於您)。連接被重寫了,它可以解決你的問題:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics新的連接語義將被包含在卡夫卡0.10.2中目標發佈日期爲2017年1月(https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan)。 –

+0

@ MatthiasJ.Sax我切換到了trunk,看起來'leftJoin'現在像KStream-KStream連接的'outerJoin'一樣,所以我想我會回到10.1的語義。我現在正在嘗試的是創建一個虛假的流,輸出null值,我將它用作主要的leftJoin中的主要元素,並在與輔助元素的leftJoin中使用該合併。我希望這會導致始終在主流中存在值,即使我的主服務器已關閉(因爲我只會從第一個左連接中獲得空值)。 –

+0

新的'leftJoin'確實觸發了雙方的舊'outerJoin'也(我想這就是你的意思,「現在看起來像leftJoin現在像一個外部連接」?) - 這比舊的'leftJoin'更接近於SQL語義 - 但是'leftJoin'仍然不同於'outerJoin':如果右側觸發並且沒有找到聯合夥伴,它將丟棄該記錄並且不發射結果。 –

回答

5

使用任何種類的連接都不能解決您的問題,因爲您總是會以丟失結果(內連接以防某些流停頓)或「重複」null(左連接或外連接兩個流都在線)。請參閱https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics瞭解有關Kafka Streams中連接語義的詳細信息。

因此,我會建議使用處理器的API,你可以使用KStreamprocess()transform(),或者transformValues() DSL混合和匹配。有關更多詳細信息,請參閱How to filter keys and value with a Processor using Kafka Stream DSL

您還可以將自定義存儲添加到您的處理器(How to add a custom StateStore to the Kafka Streams DSL processor?)以進行重複過濾容錯。

相關問題