2017-02-17 170 views
3

我有一個Kafka Streams(0.10.1.1)的問題。我正在嘗試在同一主題上創建一個KStream和一個KTableKafka Streams - 獲得KTable和KStream相同主題的最佳方式?

我試過的第一種方法是簡單地調用KStreamBuilder方法來處理同一主題上的流和表。這導致

org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic <topicName> has already been registered by another source. 

好的,這似乎是Kafka Streams內置的一些限制。

我的第二種方法是最初創建一個KTable並使用其上的toStream()方法。這有KTables做一些內部緩衝/沖洗的問題,所以如果我的例子中鍵多次出現,輸出流不會反映所有輸入元素。這是一個問題,因爲我正在計算密鑰的出現次數。

似乎工作的方法是最初創建一個KStream,按鍵對它進行分組,然後通過丟棄舊聚合並僅保留新值來「減少」它。我對這種方法並不滿意,因爲a)它看起來非常複雜,並且b)接口沒有指定哪個是已經聚集的值,哪個是新的。我按照慣例去了第二個,但是...... meh。

所以問題歸結爲:有沒有更好的方法?我錯過了一些非常明顯的東西嗎?

請記住,我沒有正確使用用例 - 這只是我開始瞭解Streams-API。

回答

3

關於添加主題兩次:這是不可能的,因爲Kafka Streams應用程序是單個「消費者組」,因此一次只能提交一個主題的偏移量,而兩次添加主題則表明該主題獲取的消費者兩次(和獨立進展)。

對於方法KTable#toString(),可以通過StreamsConfig參數cache.max.bytes.buffering == 0禁用高速緩存。但是,這是全局設置,並禁用所有KTable s的緩存/重複數據刪除(請參閱http://docs.confluent.io/current/streams/developer-guide.html#memory-management)。

groupBy方法也適用,即使它需要一些樣板。我們正在考慮在API中添加KStream#toTable()以簡化此轉換。是的,reduce中的第二個參數是新值 - 由於reduce是用於合併兩個值,所以API沒有「old」和「new」的概念,因此參數沒有這樣的命名。

+0

爲什麼不能重複使用以前添加的主題? – kvatashydze

+0

正如答案中所述。因爲我們使用單個消費者,而消費者只能訂閱一個主題。 –

相關問題