2017-08-03 121 views
1

我在春季啓動應用程序中設置了kafka監聽器,我似乎無法使用執行程序讓監聽器在池中運行。這裏是我的卡夫卡配置:春季卡夫卡監聽器檢測器

@Bean 
ThreadPoolTaskExecutor messageProcessorExecutor() { 
    logger.info("Creating a message processor pool with {} threads", numThreads); 
    ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor(); 
    exec.setCorePoolSize(200); 
    exec.setMaxPoolSize(200); 
    exec.setKeepAliveSeconds(30); 
    exec.setAllowCoreThreadTimeOut(true); 
    exec.setQueueCapacity(0); // Yields a SynchronousQueue 
    exec.setThreadFactory(ThreadFactoryFactory.defaultNamingFactory("kafka", "processor")); 
    return exec; 
} 

@Bean 
public ConsumerFactory<String, PollerJob> consumerFactory() { 
    Map<String, Object> props = new HashMap<>(); 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); 
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
    DefaultKafkaConsumerFactory<String, PollerJob> factory = new DefaultKafkaConsumerFactory<>(props, 
      new StringDeserializer(), 
      new JsonDeserializer<>(PollerJob.class)); 
    return factory; 
} 

@Bean 
public ConcurrentKafkaListenerContainerFactory<String, PollerJob> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, PollerJob> factory 
      = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.setConcurrency(Integer.valueOf(kafkaThreads)); 
    factory.getContainerProperties().setListenerTaskExecutor(messageProcessorExecutor()); 
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL); 
    return factory; 
} 

ThreadPoolTaskExecutor使用的ThreadFactoryFactory只是確保線程被命名爲喜歡'kafka-1-processor-1'

ConsumerFactoryENABLE_AUTO_COMMIT_CONFIG標誌設置爲false,並且我使用手動模式來執行根據documentation使用執行程序所需的確認。

我的聽衆是這樣的:

@KafkaListener(topics = "my_topic", 
     group = "my_group", 
     containerFactory = "kafkaListenerContainerFactory") 
public void listen(@Payload SomeJob job, Acknowledgment ack) { 
    ack.acknowledge(); 
    logger.info("Running job {}", job.getId()); 
    .... 
} 

使用管理服務器,我可以檢查所有線程和正在創建只有一個kafka-N-processor-N線程,但我期望看到多達200的作業都運行一個在那一個線程的時間,我不知道爲什麼。

如何使用我的執行程序儘可能多的線程來獲得此設置以運行偵聽器?

我使用Spring Boot 1.5.4.RELEASE和kafka 0.11.0.0。

+0

春卡夫卡版本,請問? –

+0

我已經更新了這個問題,但它是Spring Boot 1.5.4.RELEASE和kafka 0.11.0.0。謝謝。 –

+0

道歉......我最初描述的行爲是我在最後一次變更之前得到的行爲。目前發生的情況是池中只有一個線程正在創建,並且來自kafka主題的請求正在該線程上串行運行。我仍然需要幫助,找出爲什麼 –

回答

1

如果您的主題只有一個分區,則根據使用者組策略,只有一個使用者能夠輪詢該分區。

ConcurrentMessageListenerContainer實際上創建儘可能多的目標KafkaMessageListenerContainer提供的實例concurrency。只有在它不知道主題中的分區數量的情況下才這樣做。

當消費羣組中的重新平衡發生時,只有一個消費者獲取分區進行消費。所有的工作都是在單線程中完成的:

private void startInvoker() { 
    ListenerConsumer.this.invoker = new ListenerInvoker(); 
    ListenerConsumer.this.listenerInvokerFuture = this.containerProperties.getListenerTaskExecutor() 
      .submit(ListenerConsumer.this.invoker); 
} 

一個分區 - 一個用於順序記錄處理的線程。

+0

感謝您的答覆。這對我來說似乎是不正確的行爲。我知道一個分區只能有一個給定組中的一個消費者,並且如果我要將聽衆從消費者線程中解放出來並親自處理,我必須自己處理這個問題。我沒有得到的是爲什麼聽衆的線索必須與消費者的線索緊密聯繫。 Spring已經在消費者線程和偵聽器線程之間做了區分,爲什麼我不能像我想要的那樣設置儘可能多的偵聽器線程,即使根據我的分區只能有一個消費者線程? –

+0

那麼,Spring Kafka希望儘可能地接近Apache Kafka解決方案。由於缺乏適當的心跳,當前版本將該線程從消費者線程轉換爲監聽線程。在正在開發的2.0版本中,我們已經移除了已經監聽的線程,並直接在消費者線程中完成所有的處理 - 卡夫卡已經修復了心跳問題。我們這樣做只是因爲這是Apache Kafka的建議。您可以隨意在聽衆下游有任何線程模型。這已經不是'KafkaListenerContainer'責任使事情複雜化。 –

+0

夠公平的。那麼,我認爲答案只是「你不能這麼做」,這是一個很好的答案,即使它不是我想聽到的答案。再次感謝。 –