2017-01-17 45 views
3

基於apache Kafka docsKStream-to-KStream Joins are always windowed joins,我的問題是如何控制窗口的大小?這個尺寸與保持關於該主題的數據的尺寸相同嗎?或者我們可以將數據保留1個月,但只是過去一週加入流?如何管理卡夫卡KStream到Kstream窗口連接?

有什麼好的例子來顯示一個窗口kStream到kStream窗口連接?

在我的情況下,假設我有2個KStream,kstream1kstream2我希望能夠加入kstream1的10天到kstream2的30天。

回答

8

這絕對有可能。當你定義你的Stream運算符時,你明確指定了連接窗口的大小。

KStream stream1 = ...; 
KStream stream2 = ...; 
long joinWindowSizeMs = 5L * 60L * 1000L; // 5 minutes 
long windowRetentionTimeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days 

stream1.leftJoin(stream2, 
       ... // add ValueJoiner 
       JoinWindows.of(joinWindowSizeMs) 
); 

// or if you want to use retention time 

stream1.leftJoin(stream2, 
       ... // add ValueJoiner 
       (JoinWindows)JoinWindows.of(joinWindowSizeMs) 
             .until(windowRetentionTimeMs) 
); 

查看http://docs.confluent.io/current/streams/developer-guide.html#joining-streams瞭解更多詳情。

滑動窗口基本上定義了一個額外的連接謂詞。在類似SQL的語法,這將是這樣的:在這個例子中

SELECT * FROM stream1, stream2 
WHERE 
    stream1.key = stream2.key 
    AND 
    stream1.ts - before <= stream2.ts 
    AND 
    stream2.ts <= stream1.ts + after 

其中before == after == joinWindowSizeMs。如果您使用JoinWindows#before()JoinWindows#after()明確設置這些值,則beforeafter也可以具有不同的值。

源主題的保留時間完全獨立於指定的windowRetentionTimeMs,該指定應用於由Kafka Streams自身創建的更改日誌主題。窗口保留允許彼此無序地加入記錄,即記錄遲到(記住,卡夫卡具有基於的偏移的訂單保證,但關於時間戳,序)。

+0

謝謝,我會檢查它,並接受你的答案,當我可以運行它。我已經閱讀了你提到的大多數例子,但是我找不到任何KStream Windowed join – Am1rr3zA

+0

另外。我怎麼能指定不同的窗口大小,因爲在我的情況下,我想加入10天的stream-1與30天的stream-2 – Am1rr3zA

+0

對不起,關於這些例子。似乎只有KTable連接...(也有KStream-KStream連接)。無論如何。關於「加入10天的stream-1和30天的stream-2」:這對於Kafka Streams來說是不可能的,因爲Kafka Streams只支持Sliding-Window-Joins - 你需要一個Hopping-Window-Join。 –

2

除了什麼馬蒂亞斯J.薩克斯說,有一個流來流(窗口)在加入例如: https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/test/java/io/confluent/examples/streams/StreamToStreamJoinIntegrationTest.java

這是匯合3.1.X與Apache卡夫卡0.10.1,即截至2017年1月的最新版本。有關使用較新版本的代碼示例,請參見上面存儲庫中的master分支。

以上是代碼示例的關鍵部分(同樣,對於卡夫卡0.10.1),稍微適合您的問題。請注意,這個例子恰好證明了一個OUTER JOIN。

long joinWindowSizeMs = TimeUnit.MINUTES.toMillis(5); 
long windowRetentionTimeMs = TimeUnit.DAYS.toMillis(30); 

final Serde<String> stringSerde = Serdes.String(); 
KStreamBuilder builder = new KStreamBuilder(); 
KStream<String, String> alerts = builder.stream(stringSerde, stringSerde, "adImpressionsTopic"); 
KStream<String, String> incidents = builder.stream(stringSerde, stringSerde, "adClicksTopic"); 

KStream<String, String> impressionsAndClicks = alerts.outerJoin(incidents, 
    (impressionValue, clickValue) -> impressionValue + "/" + clickValue, 
    // KStream-KStream joins are always windowed joins, hence we must provide a join window. 
    JoinWindows.of(joinWindowSizeMs).until(windowRetentionTimeMs), 
    stringSerde, stringSerde, stringSerde); 

// Write the results to the output topic. 
impressionsAndClicks.to(stringSerde, stringSerde, "outputTopic");