2016-08-14 50 views
0

一旦我收到來自kafka的消息,我需要運行一個長時間運行的進程(最多需要20秒),我只需要將消息視爲成功此過程完成。如何確保消息不會丟失使用彈簧KafkaMessageListenerContainer.java

另外我需要確保每個消息至少處理一次。

使用KafkaMessageListenerContainer具有以下屬性的思維:

  1. ThreadPoolTask​​Executor類爲listenerTaskExecutor

  2. 使用類型AcknowledgingMessageListener

  3. 的一個MessageListener設置應答模式MANUL_IMMEDIATE。

但我唯一的問題是,如果特定的消息抵消說15就成功處理會發生什麼,但仍在處理中有14個消息。所以在這種情況下,我的偏移將更新爲15,即使14尚未處理

如何處理這些類型的情況?

回答

2

你不能那樣做;更高的抵消將被承諾。

如果您使用的是單個分區,您需要在同一個線程上處理每個請求,或者管理應用程序中的狀態以避免發生間隙時提交偏移量。

這是卡夫卡的工作方式。

更簡單的解決方案是對數據進行分區;偏移由分區維護。使用ConcurrentMessageListenerContainer,分區將分佈在線程中;您不能在偵聽器中使用執行程序。這樣,容器可以在處理每個分區時提交偏移量(AckMode.RECORD)。

只需創建至少包含分區數量的主題以滿足您的併發需求 - 但通常會對主題進行過度分區更好。

如果您使用代理分區分配,則應確保將會話超時屬性安全地設置爲大於您預期的最大20秒,以避免分區重新平衡。但是,只要您不使用自動提交,容器將暫停使用者,如果您的監聽器花費太長時間。

+0

感謝加里的迴應。那麼從spring-kafka的角度來看,你是否確保用戶沒有做到我上面所說的(意思是使用threadPoolExecutor作爲listnerTaskExecutor並使用MANUAL_IMMEDIATE模式)? –

+0

不,沒有檢查;手動意味着您完全控制。 –