我有一個Kafka主題,我發送位置事件(key = user_id,value = user_location)。我能夠讀取和處理它作爲一個KStream
:Kafka Streams API:KStream到KTable
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Location> locations = builder
.stream("location_topic")
.map((k, v) -> {
// some processing here, omitted form clarity
Location location = new Location(lat, lon);
return new KeyValue<>(k, location);
});
行之有效,但我想有一個與每個用戶的最後已知位置的KTable
。我怎麼能這樣做?
我能做到這一點寫入和中間話題閱讀:
// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");
// build KTable from intermediate topic
KTable<String, Location> table = builder.table("location_topic_aux", "store");
有一個簡單的方法來獲得從KStream
一個KTable
?這是我第一個使用Kafka Streams的應用程序,所以我可能錯過了一些明顯的東西。
我試圖用你的方法通過做一個'KStream'來構建'KTable'啞巴'groupByKey',但'groupByKey'方法無法解析。你有什麼想法可能會出錯嗎? (我是java生態系統和kafkas的新手) – LetsPlayYahtzee
什麼是您的Streams版本?對於舊版本,它應該是'stream.reduceByKey(...)'而不是'stream.groupByKey()。reduce(...)'。請參閱http://docs.confluent.io/3.1.0/streams/upgrade-guide.html#stream-grouping-and-aggregation –
我以爲我使用的是最新版本,但我在使用'0.10.0'時查看'0.10.1'版本的文檔。所以我修正了它:) thnx – LetsPlayYahtzee