-1
我有一個需求,其中有兩個主題需要維護1,使用同步方法,其他使用異步方式。 異步按預期調用消費者記錄,但在同步方法中消費者代碼未被調用。當kafka Producer被設置爲同步時,Kafka Consumer沒有被調用
下面是在配置文件中聲明的代碼
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
我已經啓用了自動沖洗真正這裏
@Bean(name="KafkaPayloadSyncTemplate")
public KafkaTemplate<String, KafkaPayload> KafkaPayloadSyncTemplate() {
return new KafkaTemplate<String,KafkaPayload>(producerFactory(),true);
}
控制將停止運行後,此後沒有讓消費者任何呼叫recordMetadataResults對象
private List<RecordMetadata> sendPayloadToKafkaTopicInSync() throws InterruptedException, ExecutionException {
final List<RecordMetadata> recordMetadataResults = new ArrayList<RecordMetadata>();
KafkaPayload kafkaPayload = constructKafkaPayload();
ListenableFuture<SendResult<String,KafkaPayload>>
future = KafkaPayloadSyncTemplate.send(TestTopic, kafkaPayload);
SendResult<String, KafkaPayload> results;
results = future.get();
recordMetadataResults.add(results.getRecordMetadata());
return recordMetadataResults;
}
消費者代碼
public class KafkaTestListener {
@Autowired
TestServiceImpl TestServiceImpl;
public final CountDownLatch countDownLatch = new CountDownLatch(1);
@KafkaListener(id="POC", topics = "TestTopic", group = "TestGroup")
public void listen(ConsumerRecord<String,KafkaPayload> record, Acknowledgment acknowledgment) {
countDownLatch.countDown();
TestServiceImpl.consumeKafkaMessage(record);
System.out.println("Acknowledgment : " + acknowledgment);
acknowledgment.acknowledge();
}
}
基於對這個問題,我有2個問題
- 我們應該手動調用listen()監聽器類時,其一個同步製作中。如果是,那該怎麼做?
- 如果偵聽器(
@KafkaListener
)被自動調用,還需要添加哪些其他設置/配置才能使其工作。
感謝您的輸入提前
-Srikant
此屬性已添加到consumerprops中。在我的情況下,消息沒有被消耗。 您的意思是,消費者操作應該工作,而不管我是否宣佈我的生產者類型以同步模式或異步模式工作? – user1564626
同步模式:我期待ack,因此我已經聲明瞭帶有autoFlush的KafkaTemplate設置爲true。異步模式:我調用未來的回調方法。 – user1564626
當然,如果在另一方面有消費者在話題上並不重要。製作人只是發送消息到主題。這種情況下的同步/異步意味着您如何等待確認消息存儲在主題中。這是絕對沒有關於消費者(S)。不知道發生了什麼事。也許你可以分享一些簡單的Spring Boot應用程序,我們會考慮它的缺陷。 –