2017-07-13 69 views
9

我正在使用Spring-Kafka 1.2.1版本,並且當卡夫卡服務器關閉/不可達時,異步發送呼叫塊一段時間。它似乎是TCP超時。該代碼是這樣的:春季卡夫卡異步發送呼叫塊

ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message); 
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() { 
    @Override 
    public void onSuccess(SendResult<K, V> result) { 
     ... 
    } 

    @Override 
    public void onFailure(Throwable ex) { 
     ... 
    } 
}); 

我在春天,卡夫卡代碼採取一個非常快看,它似乎只是傳遞任務一起卡夫卡客戶端庫,翻譯回調互動的未來對象交互。查看kafka客戶端庫,代碼變得更加複雜,我沒有花時間理解這一切,但我想它可能是在同一個線程中進行遠程調用(至少是元數據?)。

作爲一個用戶,我期望Spring-Kafka方法能夠立即返回未來,即使遠程kafka服務器無法訪問。

任何確認,如果我的理解是錯誤的,或者這是一個錯誤,將受到歡迎。目前爲止,我最終使其在異步方面處於異步狀態。

另一個問題是Spring-Kafka文檔在開始時說,它提供了同步和異步發送方法。我找不到任何不會返回期貨的方法,也許文檔需要更新。

如果需要,我很樂意提供任何進一步的細節。謝謝。

回答

1

只是爲了確定。您是否應用了@EnableAsync註釋?我想說這可能是指定未來行爲的關鍵<>

+0

謝謝您的答覆。不,我沒有使用這個註釋,文檔中沒有任何關於它的信息。我會嘗試一下並讓你知道它是否能解決問題。 –

+0

使用@EnableAsync不幸的是沒有改變任何東西=/ –

4

除了配置類上的@EnableAsync註釋之外,在您調用此代碼時需要在方法上使用@Async註釋。

http://www.baeldung.com/spring-async

這裏是一些代碼fragements。卡夫卡製片配置:

@EnableAsync 
@Configuration 
public class KafkaProducerConfig { 

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class); 

    @Value("${kafka.brokers}") 
    private String servers; 

    @Bean 
    public Map<String, Object> producerConfigs() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class); 
     return props; 
    } 

    @Bean 
    public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) { 
     return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper)); 
    } 

    @Bean 
    public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) { 
     return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper)); 
    } 

    @Bean 
    public Producer producer() { 
     return new Producer(); 
    } 
} 

而且生產者本身:

public class Producer { 

    public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); 

    @Autowired 
    private KafkaTemplate<String, GenericMessage> kafkaTemplate; 

    @Async 
    public void send(String topic, GenericMessage message) { 
     ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message); 
     future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() { 

      @Override 
      public void onSuccess(final SendResult<String, GenericMessage> message) { 
       LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset()); 
      } 

      @Override 
      public void onFailure(final Throwable throwable) { 
       LOGGER.error("unable to send message= " + message, throwable); 
      } 
     }); 
    } 
} 
+0

謝謝你的迴應。不,我沒有使用這些註釋,文檔中沒有提及這些註釋。我會嘗試並告訴你是否解決問題。 –

+0

使用EnableAsync不幸的是沒有改變任何東西。另外,從鏈接中我明白,它是spring-kafka庫應該使用Async註釋,因爲它爲我提供了未來的對象。 –

+0

我同意你的觀點,對我而言,你提供期貨並不合理,但我必須放置註釋。在我們的案例中,放置這兩個註釋使它像魅力一樣工作。我將編輯添加一些代碼片段的響應。 –