我們有以下問題需要與卡夫卡流解決:卡夫卡流DSL:聚合,充實和發送通過
1-獲取消息。每條消息都標有eventId(消息更新事件)和correlationId(每條消息都是唯一的)。
2-合計從該消息的一些國家(基於事件ID),並追加它在本地存儲
3-現有的狀態充實該消息爲充分凝聚狀態該事件,並通過發送到輸出話題
要點是我們不能真的失去一條消息,並且它必須一直以最新的聚合狀態(我們在消息處理期間我們實際評估的)來豐富傳入的消息。
從我迄今所看到的,我們不能只用簡單的聚合(類似的東西:)
stateMessageStream
.map((k, v) => new KeyValue[String, StateMessage](k, v))
.mapValues[StateMessageWithMarkets](sm => {StateMessageWithMarkets(Some(sm), extract(sm))})
.groupBy((k, _) => k, stringSerde, marketAggregatorSerde)
.aggregate[StateMessageWithMarkets](() => StateMessageWithMarkets(), (_, v, aggregatedState) => aggregatedState.updateModelMarketsWith(v), marketAggregatorSerde, kafkaStoreName)
.to(stringSerde, marketAggregatorSerde, kafkaOutTopic)
,因爲只有聚集在間隔產生新的記錄,這將意味着有兩個傳入的消息,我們不妨生成只有單個聚合輸出消息(所以我們失去了一條消息)
我第二次嘗試如何實現這基本上是兩個流,一個用於聚合,第二個用於普通消息。最後,我們可以加入兩個流回到一起使用連接操作的基礎上的correlationID作爲重點 - 我們可以用正確的信息傳達給適當的狀態:
val aggregatedStream : KStream[String, MarketAggregator] = stateMessageStream
.map((k, v) => new KeyValue[String, StateMessage](k, v))
.mapValues[StateMessage](v => {
log.debug("Received State Message, gameId: " + v.metadata().gtpId() + ", correlationId: " + v.correlationId)
v})
.mapValues[MarketAggregator](sm => {MarketAggregator(sm.correlationId, extract(sm))})
.groupBy((k, v) => k, stringSerde, marketAggregatorSerde)
.aggregate[MarketAggregator](() => MarketAggregator(), (_, v, aggregatedState) => aggregatedState.updateModelMarketsWith(v), marketAggregatorSerde, kafkaStoreName)
.toStream((k, v) => v.correlationId)
stateMessageStream
.selectKey[String]((k, v) => v.correlationId)
.leftJoin[MarketAggregator, StateMessageWithMarkets](aggregatedStream, (stateMessage : StateMessage, aggregatedState : MarketAggregator) => StateMessageWithMarkets(Some(stateMessage), aggregatedState.modelMarkets, stateMessage.correlationId),
JoinWindows.of(10000),
stringSerde, stateMessageSerde, marketAggregatorSerde)
.mapValues[StateMessageWithMarkets](v => {
log.debug("Producing aggregated State Message, gameId: " + v.stateMessage.map(_.metadata().gtpId()).getOrElse("unknown") +
", correlationId: " + v.stateMessage.map(_.correlationId).getOrElse("unknown"))
v
})
.to(stringSerde, stateMessageWithMarketsSerde, kafkaOutTopic)
然而,這似乎並沒有被任何工作 - 對於兩個傳入的消息,我仍然只在輸出主題上獲得最新聚合狀態的單個消息。
有人可以解釋爲什麼以及正確的解決方案是什麼?