考慮下面的代碼:爲什麼我沒有看到Kafka Streams reduce方法的輸出?
KStream<String, Custom> stream =
builder.stream(Serdes.String(), customSerde, "test_in");
stream
.groupByKey(Serdes.String(), customSerde)
.reduce(new CustomReducer(), "reduction_state")
.print(Serdes.String(), customSerde);
我有減速機,成功地打印出時,我想到的減少發生的方法適用於內println
聲明。但是,上面顯示的最終打印語句不顯示任何內容。同樣如果我使用to
方法而不是print
,我在目標主題中看不到消息。
reduce語句後需要什麼來查看減少的結果?如果一個值被推送到輸入,我不期望看到任何東西。如果使用同一個鍵的第二個值被推送,我期望reducer被應用(它所做的),我也期望減少的結果繼續到處理管道中的下一個步驟。如上所述,我在管道的後續步驟中沒有看到任何東西,我不明白爲什麼。
嘗試設置'StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG'到值0。 –
@ MatthiasJ.Sax由於!這爲我解決了這個問題,請隨時發佈它作爲答案,以便我可以給你獎勵點數。如果你可以包含一個關於這個配置細節和其他喜歡它的更多信息的鏈接,我也會很感激它。 – LaserJesus