2017-03-21 122 views
12

我有一個Kafka主題,我發送位置事件(key = user_id,value = user_location)。我能夠讀取和處理它作爲一個KStreamKafka Streams API:KStream到KTable

KStreamBuilder builder = new KStreamBuilder(); 

KStream<String, Location> locations = builder 
     .stream("location_topic") 
     .map((k, v) -> { 
      // some processing here, omitted form clarity 
      Location location = new Location(lat, lon); 
      return new KeyValue<>(k, location); 
     }); 

行之有效,但我想有一個與每個用戶的最後已知位置的KTable。我怎麼能這樣做?

我能做到這一點寫入和中間話題閱讀:

// write to intermediate topic 
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux"); 

// build KTable from intermediate topic 
KTable<String, Location> table = builder.table("location_topic_aux", "store"); 

有一個簡單的方法來獲得從KStream一個KTable?這是我第一個使用Kafka Streams的應用程序,所以我可能錯過了一些明顯的東西。

回答

12

目前沒有直接的方法來做到這一點。您的方法絕對有效,如Confluen常見問題解答中所述:http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

這是關於代碼的最簡單的方法。然而,它的缺點是(a)你需要管理一個額外的主題,並且(b)它會導致額外的網絡流量,因爲數據被寫入並重新讀取卡夫卡。

還有一個替代方案中,使用「僞減少」:

KStreamBuilder builder = new KStreamBuilder(); 
KStream<String, Long> stream = ...; // some computation that creates the derived KStream 

KTable<String, Long> table = stream.groupByKey().reduce(
    new Reducer<Long>() { 
     @Override 
     public Long apply(Long aggValue, Long newValue) { 
      return newValue; 
     } 
    }, 
    "dummy-aggregation-store"); 

這種方法在某種程度上相對於代碼相比選項1更復雜,但是具有這樣的優點是:(a )不需要手動管理主題,並且(b)不需要重新讀取來自Kafka的數據。

總體而言,你需要自己來決定,哪種方法你更喜歡:

在選項2中,卡夫卡流將創建一個內部的changelog話題備份KTable容錯。因此,這兩種方法都需要卡夫卡額外的存儲空間,並導致額外的網絡流量。總的來說,這是在選項2中稍微更復雜的代碼與選項1中的手動主題管理之間的折衷。

+0

我試圖用你的方法通過做一個'KStream'來構建'KTable'啞巴'groupByKey',但'groupByKey'方法無法解析。你有什麼想法可能會出錯嗎? (我是java生態系統和kafkas的新手) – LetsPlayYahtzee

+1

什麼是您的Streams版本?對於舊版本,它應該是'stream.reduceByKey(...)'而不是'stream.groupByKey()。reduce(...)'。請參閱http://docs.confluent.io/3.1.0/streams/upgrade-guide.html#stream-grouping-and-aggregation –

+1

我以爲我使用的是最新版本,但我在使用'0.10.0'時查看'0.10.1'版本的文檔。所以我修正了它:) thnx – LetsPlayYahtzee

相關問題