我不明白SQS QueueListener的工作原理。併發SQS隊列監聽器
這是我的配置:
/**
* AWS Credentials Bean
*/
@Bean
public AWSCredentials awsCredentials() {
return new BasicAWSCredentials(accessKey, secretAccessKey);
}
/**
* AWS Client Bean
*/
@Bean
public AmazonSQS amazonSQSAsyncClient() {
AmazonSQS sqsClient = new AmazonSQSClient(awsCredentials());
sqsClient.setRegion(Region.getRegion(Regions.US_EAST_1));
return sqsClient;
}
/**
* AWS Connection Factory
*/
@Bean
public SQSConnectionFactory connectionFactory() {
SQSConnectionFactory.Builder factoryBuilder = new SQSConnectionFactory.Builder(
Region.getRegion(Regions.US_EAST_1));
factoryBuilder.setAwsCredentialsProvider(new AWSCredentialsProvider() {
@Override
public AWSCredentials getCredentials() {
return awsCredentials();
}
@Override
public void refresh() {
}
});
return factoryBuilder.build();
}
/**
* Registering QueueListener for queueName
*/
@Bean
public DefaultMessageListenerContainer defaultMessageListenerContainer() {
DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
messageListenerContainer.setConnectionFactory(connectionFactory());
messageListenerContainer.setDestinationName(queueName);
messageListenerContainer.setMessageListener(new MessageListenerAdapter(new LabQueueListener()));
messageListenerContainer.setErrorHandler(new QueueListenerErrorHandler());
messageListenerContainer.setTaskExecutor(Executors.newFixedThreadPool(3));
return messageListenerContainer;
}
正如你所看到的,我已經配置我DefaultMessageListenerContainer
與Executors.newFixedThreadPool(3)
這樣,我希望在我的隊列監聽3並行任務執行一次。
Thsi是我的監聽器邏輯:
public class QueueListener {
public void handleMessage(String messageContent) {
try {
logger.info(String.format("message received: %s", messageContent));
logger.info("wait 30 sec");
Thread.sleep(1000 * 30);
logger.info("done");
} catch (Throwable th) {
throw new QueueListenerException(messageContent, th);
}
}
}
現在每個handleMessage
方法塊(Thread.sleep(1000 * 30);
)執行30秒,只有1 handleMessage
方法執行一次。
我在做什麼錯? 如何同時實現併發handleMessage
方法調用? 對於目前的配置,我預計會同時執行3 handleMessage
。
如果不增加'maxConcurrentConsumers'什麼將是平行不管你要去一個線程池添加哪種類型的線程池或多麼大的。 –
謝謝,現在一切正常! – alexanoid