2017-06-26 89 views
1

所以我需要有一個GlobalKTable包含跨多個實例的幾個消息的聚合。現在,我的單實例KTable設置看起來是這樣的:卡夫卡流 - 如何做全球度量聚合?

final KTable<String, Double> aggregatedMetrics = eventStream 
     .groupByKey(Serdes.String(), jsonSerde) 
     .aggregate(
       () -> 0d, 
       new MetricsAggregator(), 
       Serdes.Double(), 
       LOCAL_METRICS_STORE_NAME); 

顯然,這不能擴展,因爲每個實例只用於已收到的消息更新指標,不是對所有收到的消息所有其他實例。我想用這個的:

final KStreamBuilder builder = new KStreamBuilder(); 
builder.globalTable(METRIC_CHANGES_TOPIC, METRICS_STORE_NAME); 

,然後就流更新到我的aggregatedMetrics KTable到METRIC_CHANGES_TOPIC,這將更新全局表。但是,每次更新全局表時,每個實例都會覆蓋其他實例的聚合。

有什麼辦法可以做全球聚合嗎?

回答

1

解決方案對我來說聽起來正確。

這聽起來並不正確:

然而,每個實例也只是在每次更新全局表覆蓋的其他實例聚合。

請注意,聚合是基於密鑰完成的。因此,不同的實例將聚合在不同的密鑰上,因此,每個實例將只更新其在GlobalKTable中的自己的密鑰。

+0

謝謝!你能否給我一個鏈接到更詳細討論這個文檔的文檔? –

+1

希望這有助於:http://docs.confluent.io/current/streams/architecture.html –