我在春季啓動應用程序中設置了kafka監聽器,我似乎無法使用執行程序讓監聽器在池中運行。這裏是我的卡夫卡配置:春季卡夫卡監聽器檢測器
@Bean
ThreadPoolTaskExecutor messageProcessorExecutor() {
logger.info("Creating a message processor pool with {} threads", numThreads);
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(200);
exec.setMaxPoolSize(200);
exec.setKeepAliveSeconds(30);
exec.setAllowCoreThreadTimeOut(true);
exec.setQueueCapacity(0); // Yields a SynchronousQueue
exec.setThreadFactory(ThreadFactoryFactory.defaultNamingFactory("kafka", "processor"));
return exec;
}
@Bean
public ConsumerFactory<String, PollerJob> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
DefaultKafkaConsumerFactory<String, PollerJob> factory = new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(),
new JsonDeserializer<>(PollerJob.class));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PollerJob> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, PollerJob> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(Integer.valueOf(kafkaThreads));
factory.getContainerProperties().setListenerTaskExecutor(messageProcessorExecutor());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
return factory;
}
由ThreadPoolTaskExecutor
使用的ThreadFactoryFactory
只是確保線程被命名爲喜歡'kafka-1-processor-1'
。
ConsumerFactory
將ENABLE_AUTO_COMMIT_CONFIG
標誌設置爲false,並且我使用手動模式來執行根據documentation使用執行程序所需的確認。
我的聽衆是這樣的:
@KafkaListener(topics = "my_topic",
group = "my_group",
containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload SomeJob job, Acknowledgment ack) {
ack.acknowledge();
logger.info("Running job {}", job.getId());
....
}
使用管理服務器,我可以檢查所有線程和正在創建只有一個kafka-N-processor-N
線程,但我期望看到多達200的作業都運行一個在那一個線程的時間,我不知道爲什麼。
如何使用我的執行程序儘可能多的線程來獲得此設置以運行偵聽器?
我使用Spring Boot 1.5.4.RELEASE和kafka 0.11.0.0。
春卡夫卡版本,請問? –
我已經更新了這個問題,但它是Spring Boot 1.5.4.RELEASE和kafka 0.11.0.0。謝謝。 –
道歉......我最初描述的行爲是我在最後一次變更之前得到的行爲。目前發生的情況是池中只有一個線程正在創建,並且來自kafka主題的請求正在該線程上串行運行。我仍然需要幫助,找出爲什麼 –