除了什麼馬蒂亞斯J.薩克斯說,有一個流來流(窗口)在加入例如: https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/test/java/io/confluent/examples/streams/StreamToStreamJoinIntegrationTest.java
這是匯合3.1.X與Apache卡夫卡0.10.1,即截至2017年1月的最新版本。有關使用較新版本的代碼示例,請參見上面存儲庫中的master
分支。
以上是代碼示例的關鍵部分(同樣,對於卡夫卡0.10.1),稍微適合您的問題。請注意,這個例子恰好證明了一個OUTER JOIN。
long joinWindowSizeMs = TimeUnit.MINUTES.toMillis(5);
long windowRetentionTimeMs = TimeUnit.DAYS.toMillis(30);
final Serde<String> stringSerde = Serdes.String();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> alerts = builder.stream(stringSerde, stringSerde, "adImpressionsTopic");
KStream<String, String> incidents = builder.stream(stringSerde, stringSerde, "adClicksTopic");
KStream<String, String> impressionsAndClicks = alerts.outerJoin(incidents,
(impressionValue, clickValue) -> impressionValue + "/" + clickValue,
// KStream-KStream joins are always windowed joins, hence we must provide a join window.
JoinWindows.of(joinWindowSizeMs).until(windowRetentionTimeMs),
stringSerde, stringSerde, stringSerde);
// Write the results to the output topic.
impressionsAndClicks.to(stringSerde, stringSerde, "outputTopic");
謝謝,我會檢查它,並接受你的答案,當我可以運行它。我已經閱讀了你提到的大多數例子,但是我找不到任何KStream Windowed join – Am1rr3zA
另外。我怎麼能指定不同的窗口大小,因爲在我的情況下,我想加入10天的stream-1與30天的stream-2 – Am1rr3zA
對不起,關於這些例子。似乎只有KTable連接...(也有KStream-KStream連接)。無論如何。關於「加入10天的stream-1和30天的stream-2」:這對於Kafka Streams來說是不可能的,因爲Kafka Streams只支持Sliding-Window-Joins - 你需要一個Hopping-Window-Join。 –