1
所以我需要有一個GlobalKTable
包含跨多個實例的幾個消息的聚合。現在,我的單實例KTable
設置看起來是這樣的:卡夫卡流 - 如何做全球度量聚合?
final KTable<String, Double> aggregatedMetrics = eventStream
.groupByKey(Serdes.String(), jsonSerde)
.aggregate(
() -> 0d,
new MetricsAggregator(),
Serdes.Double(),
LOCAL_METRICS_STORE_NAME);
顯然,這不能擴展,因爲每個實例只用於已收到的消息更新指標,不是對所有收到的消息所有其他實例。我想用這個的:
final KStreamBuilder builder = new KStreamBuilder();
builder.globalTable(METRIC_CHANGES_TOPIC, METRICS_STORE_NAME);
,然後就流更新到我的aggregatedMetrics
KTable到METRIC_CHANGES_TOPIC
,這將更新全局表。但是,每次更新全局表時,每個實例都會覆蓋其他實例的聚合。
有什麼辦法可以做全球聚合嗎?
謝謝!你能否給我一個鏈接到更詳細討論這個文檔的文檔? –
希望這有助於:http://docs.confluent.io/current/streams/architecture.html –