2017-02-21 42 views
-1

我有一個需求,其中有兩個主題需要維護1,使用同步方法,其他使用異步方式。 異步按預期調用消費者記錄,但在同步方法中消費者代碼未被調用。當kafka Producer被設置爲同步時,Kafka Consumer沒有被調用

下面是在配置文件中聲明的代碼

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093"); 
props.put(ProducerConfig.RETRIES_CONFIG, 3); 
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); 
props.put(ProducerConfig.ACKS_CONFIG, "all"); 
props.put(ProducerConfig.LINGER_MS_CONFIG, 1); 
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); 

我已經啓用了自動沖洗真正這裏

@Bean(name="KafkaPayloadSyncTemplate") 
    public KafkaTemplate<String, KafkaPayload> KafkaPayloadSyncTemplate() { 
     return new KafkaTemplate<String,KafkaPayload>(producerFactory(),true); 
} 

控制將停止運行後,此後沒有讓消費者任何呼叫recordMetadataResults對象

private List<RecordMetadata> sendPayloadToKafkaTopicInSync() throws InterruptedException, ExecutionException {  
     final List<RecordMetadata> recordMetadataResults = new ArrayList<RecordMetadata>(); 
     KafkaPayload kafkaPayload = constructKafkaPayload(); 
     ListenableFuture<SendResult<String,KafkaPayload>> 
future = KafkaPayloadSyncTemplate.send(TestTopic, kafkaPayload); 
     SendResult<String, KafkaPayload> results; 
     results = future.get(); 
     recordMetadataResults.add(results.getRecordMetadata());  
     return recordMetadataResults;   
    } 

消費者代碼

public class KafkaTestListener {  
    @Autowired 
    TestServiceImpl TestServiceImpl;  
    public final CountDownLatch countDownLatch = new CountDownLatch(1); 
    @KafkaListener(id="POC", topics = "TestTopic", group = "TestGroup") 
    public void listen(ConsumerRecord<String,KafkaPayload> record, Acknowledgment acknowledgment) { 
     countDownLatch.countDown();  
     TestServiceImpl.consumeKafkaMessage(record);   
     System.out.println("Acknowledgment : " + acknowledgment); 
     acknowledgment.acknowledge();  
    } 
} 

基於對這個問題,我有2個問題

  1. 我們應該手動調用listen()監聽器類時,其一個同步製作中。如果是,那該怎麼做?
  2. 如果偵聽器(@KafkaListener)被自動調用,還需要添加哪些其他設置/配置才能使其工作。

感謝您的輸入提前

-Srikant

回答

1

您應該確保您使用consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");的消費屬性。

不確定你對sync/async的意思,但生產和消費是完全區分的操作。而且你不能影響生產者方面的消費者。因爲之間有卡夫卡經紀人。

+0

此屬性已添加到consumerprops中。在我的情況下,消息沒有被消耗。 您的意思是,消費者操作應該工作,而不管我是否宣佈我的生產者類型以同步模式或異步模式工作? – user1564626

+0

同步模式:我期待ack,因此我已經聲明瞭帶有autoFlush的KafkaTemplate設置爲true。異步模式:我調用未來的回調方法。 – user1564626

+0

當然,如果在另一方面有消費者在話題上並不重要。製作人只是發送消息到主題。這種情況下的同步/異步意味着您如何等待確認消息存儲在主題中。這是絕對沒有關於消費者(S)。不知道發生了什麼事。也許你可以分享一些簡單的Spring Boot應用程序,我們會考慮它的缺陷。 –

相關問題