2017-07-06 127 views
0

我有2個流都對鍵和對值相同的類型,卡夫卡foreing鍵連接

- first represent a finantial instrument with key (string) Currency (Eur-USD) 
    - second represent a finantial instrument with key (tenor) Eur-3month , Eur-6month , USD-3month 


    - first stream: <key, value> = <Eur , { data , .... } > 
    - second stream: <key, value> = <Eur-3month , { data .... }> 

的要求是最後一個流必須始終加入其他流這取決於最後的關鍵收到(月:3M,6M,7M)

- i thought that the streams must be K-tables is this the correct way to join them and have in output always the last join on the last updates ? 
    - i can have the same results with a stream ? 

在此聲明我已經發現的最類似的事情,我可以用的是

KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable, 
           final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper, 
           final ValueJoiner<? super V, ? super GV, ? extends RV> joiner) 

使用KeyValueMapper我可以加入鍵 但左側我有一個流,而不是K-表,這將不會更新左側

回答

0

通常選擇較小的「更新」中加入通過從kafka作爲表(KStreamBuilder.table())或通過使用.groupByKey(),然後使用reduce()或aggregate()來將它物化爲KTable。 。

然後你加入你的其他流與該KTable。

或者,您可以將兩個流實現爲KTables並加入它們。我並沒有完全理解你的用例,因此不能建議哪個更好。

參見:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics

+0

問題是類似於此https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams但ktable-ktable,是的,你沒有得到這個問題 –

+0

如果你用貨幣代碼分割你的主題,那麼你不需要使用GlobalKTables。使用普通的KTables可以獲得更多的連接選項。 –

+0

問題不在GlobalKtables周圍,問題是有一個外鍵,在一個表中我們有一個K和另一個K-K',我想連接k = k –