1

我正在建設一個春季卡夫卡消費者。我已經設置了重試機制。重試完畢後,我想將失敗的消息推送給死信主題。Kafka Consumer - 收聽方收到的參數恢復方法

Listen方法具有以下參數

public void listen(@Payload Map<String, Object> conciseMap, 
     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, 
     Acknowledgment ack) throws JsonProcessingException { 

作爲恢復方法的一部分,我想取conciseMap輸入到聽者的地圖或者是由我的主題接收到的原始信息傳遞。有沒有辦法做到這一點?

@Bean 
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConcurrency(conncurrency); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.setRetryTemplate(retryTemplate()); 
    factory.setRecoveryCallback(new RecoveryCallback<Object>() { 
     @Override 
     public Object recover(RetryContext context) throws Exception { 
      // TODO Auto-generated method stub 
      logger.debug(" In recovery callback method !!"); 
      ((Acknowledgment)context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)).acknowledge(); 

      return null; 
     } 
    }); 
    factory.getContainerProperties().setAckMode(AckMode.MANUAL); 
    return factory; 
} 

     factory.getContainerProperties().setAckMode(AckMode.MANUAL); 
     return factory; 
    } 

    /* 
    * Retry template. 
    */ 

    protected RetryPolicy retryPolicy() { 
     SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions); 
     return policy; 
    } 

    protected BackOffPolicy backOffPolicy() { 
     ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy(); 
     policy.setInitialInterval(initialRetryInterval); 
     policy.setMultiplier(retryMultiplier); 
     return policy; 
    } 

    protected RetryTemplate retryTemplate() { 
     RetryTemplate template = new RetryTemplate(); 
     template.setRetryPolicy(retryPolicy()); 
     template.setBackOffPolicy(backOffPolicy()); 
     return template; 
    } 
} 

回答

1

你不能得到,在RecoveryCallbackRetryContext轉換conciseMap,但你可以檢索ConsumerRecord是從主題原始轉換前:

(ConsumerRecord) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD) 
+0

由於阿爾喬姆。 ConsumerRecord.value是否提供我們在偵聽器方法中獲得的字節? –

+0

M-m-m。可能。這是提供'Deserializer'的結果 –