2016-11-10 128 views
4

考慮下面的代碼:爲什麼我沒有看到Kafka Streams reduce方法的輸出?

KStream<String, Custom> stream = 
    builder.stream(Serdes.String(), customSerde, "test_in"); 

stream 
    .groupByKey(Serdes.String(), customSerde) 
    .reduce(new CustomReducer(), "reduction_state") 
    .print(Serdes.String(), customSerde); 

我有減速機,成功地打印出時,我想到的減少發生的方法適用於內println聲明。但是,上面顯示的最終打印語句不顯示任何內容。同樣如果我使用to方法而不是print,我在目標主題中看不到消息。

reduce語句後需要什麼來查看減少的結果?如果一個值被推送到輸入,我不期望看到任何東西。如果使用同一個鍵的第二個值被推送,我期望reducer被應用(它所做的),我也期望減少的結果繼續到處理管道中的下一個步驟。如上所述,我在管道的後續步驟中沒有看到任何東西,我不明白爲什麼。

+2

嘗試設置'StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG'到值0。 –

+0

@ MatthiasJ.Sax由於!這爲我解決了這個問題,請隨時發佈它作爲答案,以便我可以給你獎勵點數。如果你可以包含一個關於這個配置細節和其他喜歡它的更多信息的鏈接,我也會很感激它。 – LaserJesus

回答

7

從Kafka 0.10.1.0開始,所有聚合運算符都使用內部重複數據刪除緩存來減少結果KTable更改日誌流的負載。例如,如果您使用同一個鍵直接計數並處理兩個記錄,則完整的更新日誌流將爲<key:1>, <key:2>

使用新的緩存功能,緩存將接收到<key:1>並存儲它,但不會立即向下遊發送。計算<key:2>時,它將替換緩存的第一個條目。根據緩存大小,不同密鑰的數量,吞吐量以及提交間隔,緩存會向下遊發送條目。這發生在單個密鑰條目的緩存逐出或緩存的完全刷新(發送所有下游條目)時發生。因此,KTable更新日誌可能只顯示<key:2>(因爲<key:1>得到了重複刪除)。

您可以通過Streams配置參數StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG來控制緩存的大小。如果將值設置爲零,則完全禁用緩存,並且KTable更新日誌將包含所有更新(有效提供0.10.1.0行爲)。

匯合的文檔包含的截面說明更詳細地緩存:

相關問題