1

我正在建立一個卡夫卡消費者。我已經設置了類似於下面的恢復回調。我已啓用手動提交。我如何在恢復回調方法中確認消息,以免發生延遲。春季卡夫卡消費者手動提交恢復回撥機制

@Bean 
    public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConcurrency(conncurrency); 
     factory.setConsumerFactory(consumerFactory()); 
     factory.setRetryTemplate(retryTemplate()); 
       factory.setRecoveryCallback(new RecoveryCallback<Object>() { 
     @Override 
     public Object recover(RetryContext context) throws Exception { 
      // TODO Auto-generated method stub 
      logger.debug(" In recovery callback method !!"); 
      return null; 
     } 
    }); 
     factory.getContainerProperties().setAckMode(AckMode.MANUAL); 
     return factory; 
    } 

    /* 
    * Retry template. 
    */ 

    protected RetryPolicy retryPolicy() { 
     SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions); 
     return policy; 
    } 

    protected BackOffPolicy backOffPolicy() { 
     ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy(); 
     policy.setInitialInterval(initialRetryInterval); 
     policy.setMultiplier(retryMultiplier); 
     return policy; 
    } 

    protected RetryTemplate retryTemplate() { 
     RetryTemplate template = new RetryTemplate(); 
     template.setRetryPolicy(retryPolicy()); 
     template.setBackOffPolicy(backOffPolicy()); 
     return template; 
    } 
} 

回答

2

您的問題太寬泛。你需要更具體。

在框架中沒有任何假設你在消耗錯誤期間重試耗盡時可以做什麼。

我想你應該從Spring Retry項目開始明白那是什麼RecoveryCallback在所有和它是如何工作:

如果說之前的模板決定中止業務邏輯沒有成功,那麼客戶端給予機會通過恢復回調做一些替代處理。

一個RetryContext有:

/** 
* Accessor for the exception object that caused the current retry. 
* 
* @return the last exception that caused a retry, or possibly null. It will be null 
* if this is the first attempt, but also if the enclosing policy decides not to 
* provide it (e.g. because of concerns about memory usage). 
*/ 
Throwable getLastThrowable(); 

而且春季卡夫卡填充額外的屬性,該屬性RetryContext處理在RecoveryCallback:傳遞到RecoveryCallbackRetryContexthttps://docs.spring.io/spring-kafka/docs/2.0.0.RELEASE/reference/html/_reference.html#_retrying_deliveries

內容將取決於監聽器的類型。上下文將始終有一個屬性record,這是發生故障的記錄。如果您的聽衆正在確認和/或消費者意識到,則其他屬性acknowledgment和/或consumer將可用。爲了方便起見,RetryingAcknowledgingMessageListenerAdapter爲這些鍵提供了靜態常量。請參閱其javadoc以獲取更多信息。

+0

謝謝阿爾喬姆。根據最後一條語句,我假設我能夠從retrycontext獲得確認屬性。這怎麼可能被檢索? –

+0

'(確認)retryContext.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)' –

+0

非常感謝! –