2017-06-15 69 views
0

所以,我有兩個不同的KStream就像這樣:加入兩種不同KStreams

流1:(String鍵,Object值1)

流2:(String鍵,Object值2)

我想加入他們,這樣我就會看到一個類似於(Object value1,Object value2)的流。

乾淨的方法是什麼?

+0

有幾個連接類型,取決於你想要達到的目標。這是在信息文章,可能會幫助你進一步:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics – jvwilge

回答

1

工作的一種方式是加入兩個流,使得生成的流的值是包含兩個原始值的容器類。然後,映射流以將值從容器中取出並將其中一個用作關鍵字。

代碼:

KStream<String, Object> stream1; 
KStream<String, Object> stream2; 

KStream<Object, Object> joinedStream = stream1 
     .join(stream2, (value1, value2) -> new MyValueContainer(value1, value2)) 
     .map((key, container) -> new KeyValue<Object, Object>(container.getValue1(), container.getValue2())); 
+0

在連接我認爲你的意思是寫新的MyValueContainer(value1,value2 )。 –

+0

@MichalBorowiecki我的意思是這樣做 - 謝謝指出 –