我有2周卡夫卡的話題 - recommendations
和clicks
。第一個主題具有由唯一ID(稱爲recommendationsId
)鍵入的推薦對象。每個產品都有一個用戶可以點擊的URL。卡夫卡流加入
的clicks
主題得到由上向用戶推薦產品的那些的URL的點擊生成的消息。它已經這樣設置,這些點擊消息也被recommendationId
鍵入。
注意,建議和點擊之間
關係是一對多。建議可能會導致多次點擊,但點擊總是與單個建議相關聯。
每個點擊對象都會有相應的推薦對象。
點擊對象有一個時間戳晚於建議反對。
的建議和相應的點擊(S)之間的差距可能是幾秒鐘至幾天(比如說,7天最多)。
我的目標是使用卡夫卡流加入來加入這兩個主題。我不清楚的是我應該使用KStream x KStream加入還是KStream x KTable加入。
我實施了KStream x KTable
加入clicks
流通過recommendations
表。但是,如果建議在生成之前之前生成了連接器,並且在連接器啓動之後點擊到達,我無法看到任何加入的點擊 - 建議對。
我使用正確的連接嗎?我應該使用KStream x KStream
加入嗎?如果是這樣,爲了能夠在過去至多7天內加入推薦的點擊,我應該將窗口大小設置爲7天嗎?在這種情況下,我是否還需要設置「保留期」?
我的代碼來執行KStream x KTable
加入如下。請注意,我已經定義了類Recommendations
和Click
及其相應的serde。點擊消息只是簡單的String
(url)。此URL字符串與Recommendations
對象合併,以創建發送到jointTopic
的Click
對象。
public static void main(String[] args){
if(args.length!=4){
throw new RuntimeException("Expected 3 params: bootstraplist clickTopic recsTopic jointTopic");
}
final String booststrapList = args[0];
final String clicksTopic = args[1];
final String recsTopic = args[2];
final String jointTopic = args[3];
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my_joiner_id");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, booststrapList);
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, JoinSerdes.CLICK_SERDE.getClass().getName());
KStreamBuilder builder = new KStreamBuilder();
// load clicks as KStream
KStream<String, String> clicksStream = builder.stream(Serdes.String(), Serdes.String(), clicksTopic);
// load recommendations as KTable
KTable<String, Recommendations> recsTable = builder.table(Serdes.String(), JoinSerdes.RECS_SERDE, recsTopic);
// join the two
KStream<String, Click> join = clicksStream.leftJoin(recsTable, (click, recs) -> new Click(click, recs));
// emit the join to the jointTopic
join.to(Serdes.String(), JoinSerdes.CLICK_SERDE, jointTopic);
// let the action begin
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
這工作得很好,只要雙方的建議和點擊的夾板(以上程序)後,已產生運行。但是,如果點擊到達,建議生成之前運行了連接器,我看不到任何連接發生。我該如何解決?
如果解決方案是使用KStream x KSTream
加入,那麼請幫助我瞭解什麼窗口大小,我應該選擇和選擇什麼樣的保留期限。
這可能有幫助:https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/ –