2017-05-31 219 views
2

我有一個kafka流 - 說博客和卡夫卡表 - 說與這些博客相關的評論。卡夫卡流的關鍵字可以映射到卡夫卡表中的多個值,即一個博客可以有多個評論。我想要對這兩者進行連接,並用一系列註釋ID創建一個新對象。但是當我進行連接時,流只包含最後一個註釋ID。是否有任何文檔或示例代碼可以指出我如何實現這一目標?基本上,是否有任何文檔闡述如何使用Kafka流和Kafka表進行一對多關係連接?卡夫卡流和卡夫卡表一對多關係加入

KStream<Integer, EnrichedBlog> joinedBlogComments = blogsStream.join(commentsTbl, 
       (blogId, blog) -> blog.getBlogId(), 
       (blog, comment) -> new EnrichedBlog(blog, comment)); 

因此,而不是評論 - 我需要有一個評論ID數組。

回答

3

我無法找到相匹配的是,在你的代碼示例中的簽名的連接方法,但在這裏,我認爲這是什麼問題:

KTables被解釋爲changlog,也就是說,每一個後續消息相同的鍵被解釋爲對記錄的更新,而不是新的記錄。這就是爲什麼你只看到給定鍵(博客ID)的最後一條「評論」消息,以前的值將被覆蓋。 爲了克服這個問題,您需要首先改變如何填充KTable。您可以做的是將您的評論主題作爲KStream添加到您的拓撲中,然後執行聚合,只需構建一個數組或共享相同博客ID的註釋列表。該聚合返回一個KTable,您可以加入您的博客KStream。

這裏有一個草圖,你如何能做到這一點,構建一個列表值KTable:

builder.stream("yourCommentTopic") // where key is blog id 
.groupByKey() 
.aggregate(() -> new ArrayList(), 
    (key, value, agg) -> new KeyValue<>(key, agg.add(value)), 
    yourListSerde); 

名單更容易比數組中的聚合來使用,所以我建議你把它轉換成數組下游如果需要的話。您還需要在上面的示例中爲您的列表「yourListSerde」提供一個serde實現。

+0

爲了便於閱讀,我想補充一點,「yourCommentTopic」的關鍵是相應的博客帖子ID。然後'groupByKey'步驟確保隨後的聚合步驟可以訪問特定博客文章的所有評論(因此可以創建所有評論的列表)。 –

+0

謝謝!修改了這個效果的答案 –