我正在學習Kafka Stream並使用Processor API來實現我的用例。下面的代碼顯示了Process方法,它在調用commit
之前將消息轉發到下游並中止。這會導致流被重新處理並複製宿端上的消息。如何以完全一次的模式實現處理器API
public void process(String key, String value) {
context.forward(key, value);
..
..
//killed
context.commit();
}
processing.guarantee參數:
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
是否有隻調用commit
語句時應用轉發的方式。如果不是的話,那麼實施完全一次模式的正確方法是什麼。
謝謝
嗨,蘇尼爾,只有當過程停止優雅時纔有效。 – user3812692