一旦我收到來自kafka的消息,我需要運行一個長時間運行的進程(最多需要20秒),我只需要將消息視爲成功此過程完成。如何確保消息不會丟失使用彈簧KafkaMessageListenerContainer.java
另外我需要確保每個消息至少處理一次。
使用KafkaMessageListenerContainer具有以下屬性的思維:
ThreadPoolTaskExecutor類爲listenerTaskExecutor
使用類型AcknowledgingMessageListener
的一個MessageListener設置應答模式MANUL_IMMEDIATE。
但我唯一的問題是,如果特定的消息抵消說15就成功處理會發生什麼,但仍在處理中有14個消息。所以在這種情況下,我的偏移將更新爲15,即使14尚未處理
如何處理這些類型的情況?
感謝加里的迴應。那麼從spring-kafka的角度來看,你是否確保用戶沒有做到我上面所說的(意思是使用threadPoolExecutor作爲listnerTaskExecutor並使用MANUAL_IMMEDIATE模式)? –
不,沒有檢查;手動意味着您完全控制。 –