2015-08-31 75 views
3

我不明白SQS QueueListener的工作原理。併發SQS隊列監聽器

這是我的配置:

/** 
    * AWS Credentials Bean 
    */ 
    @Bean 
    public AWSCredentials awsCredentials() { 
     return new BasicAWSCredentials(accessKey, secretAccessKey); 
    } 

    /** 
    * AWS Client Bean 
    */ 
    @Bean 
    public AmazonSQS amazonSQSAsyncClient() { 
     AmazonSQS sqsClient = new AmazonSQSClient(awsCredentials()); 
     sqsClient.setRegion(Region.getRegion(Regions.US_EAST_1)); 
     return sqsClient; 
    } 

    /** 
    * AWS Connection Factory 
    */ 
    @Bean 
    public SQSConnectionFactory connectionFactory() { 
     SQSConnectionFactory.Builder factoryBuilder = new SQSConnectionFactory.Builder(
       Region.getRegion(Regions.US_EAST_1)); 
     factoryBuilder.setAwsCredentialsProvider(new AWSCredentialsProvider() { 

      @Override 
      public AWSCredentials getCredentials() { 
       return awsCredentials(); 
      } 

      @Override 
      public void refresh() { 
      } 

     }); 
     return factoryBuilder.build(); 
    } 

    /** 
    * Registering QueueListener for queueName 
    */ 
    @Bean 
    public DefaultMessageListenerContainer defaultMessageListenerContainer() { 
     DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(); 
     messageListenerContainer.setConnectionFactory(connectionFactory()); 
     messageListenerContainer.setDestinationName(queueName); 
     messageListenerContainer.setMessageListener(new MessageListenerAdapter(new LabQueueListener())); 
     messageListenerContainer.setErrorHandler(new QueueListenerErrorHandler()); 
     messageListenerContainer.setTaskExecutor(Executors.newFixedThreadPool(3)); 

     return messageListenerContainer; 
    } 

正如你所看到的,我已經配置我DefaultMessageListenerContainerExecutors.newFixedThreadPool(3)

這樣,我希望在我的隊列監聽3並行任務執行一次。

Thsi是我的監聽器邏輯:

public class QueueListener { 

    public void handleMessage(String messageContent) { 
     try { 
      logger.info(String.format("message received: %s", messageContent)); 
      logger.info("wait 30 sec"); 
      Thread.sleep(1000 * 30); 
      logger.info("done"); 
     } catch (Throwable th) { 
      throw new QueueListenerException(messageContent, th); 
     } 
    } 
} 

現在每個handleMessage方法塊(Thread.sleep(1000 * 30);)執行30秒,只有1 handleMessage方法執行一次。

我在做什麼錯? 如何同時實現併發handleMessage方法調用? 對於目前的配置,我預計會同時執行3 handleMessage

+1

如果不增加'maxConcurrentConsumers'什麼將是平行不管你要去一個線程池添加哪種類型的線程池或多麼大的。 –

+0

謝謝,現在一切正常! – alexanoid

回答

1

可以添加參數通過添加messageListenerContainer.setConcurrency("3-10");以處理豆爲DefaultMessageListenerConfigurator併發執行這意味着將開始與3個線程和擴大至10
concurrentConsumers的數量也可以通過使用messageListenerContainer.setConcurrentConsumers(3);

可以替代地設置

參見:https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html#setConcurrency-java.lang.String-