請先查看我的代碼。在多線程環境中沒有修復發送到Rabbitmq服務器的消息的順序環境
這是我的測試類這是創建2000線程和那些線程正在發送消息。
public class MessageSenderMultipleThreadMock {
@Autowired
MessageList message;
@Autowired
MessageSender sender;
public boolean process() throws InterruptedException {
for (int i = 0; i < 2000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
String routingkey = "operation"
+ UUID.randomUUID().toString();
String queueName = UUID.randomUUID().toString();
message.setSender(Thread.currentThread().getName());
try {
sender.sendMessage(routingkey, queueName,
"this is message");
} catch (InvalidMessagingParameters e) {
e.printStackTrace();
}
}
}).start();
Thread.sleep(1000);
}
Thread.currentThread();
Thread.sleep(10000);
return true;
}
}
消息發送者
這是我的主消息發送者類
@Service
public class MessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MessageList message;
String queueName = "";
String routingKey = "";
@Autowired
private QueueCreationService service;
private boolean messageSentFlag;
String returnedMessage = "";
private Logger log = LoggerFactory.getLogger(MessageSender.class.getName());
public boolean sendMessage(String routingKey, String queueName,
String messageToBeSent) throws InvalidMessagingParameters {
if ((routingKey == null && queueName == null)
|| (routingKey.equalsIgnoreCase("") || queueName
.equalsIgnoreCase("")))
throw new InvalidMessagingParameters(routingKey, queueName);
else {
this.routingKey = routingKey;
this.queueName = queueName;
}
service.processBinding(queueName, routingKey);
message.addMessages(messageToBeSent);
return execute();
}
/*
* overloaded sendMessage method will use requestMap . RequestMap includes
* queueName and routingKey that controller provides.
*/
public boolean sendMessage(Map<String, String> requestMap)
throws MessagingConnectionFailsException,
InvalidMessagingParameters {
this.queueName = requestMap.get("queue");
this.routingKey = requestMap.get("routingkey");
if ((routingKey == null && queueName == null)
|| (routingKey.equalsIgnoreCase("") || queueName
.equalsIgnoreCase("")))
throw new InvalidMessagingParameters(routingKey, queueName);
service.processBinding(queueName, routingKey);
preparingMessagingTemplate();
return execute();
}
private boolean execute() {
for (int i = 0; i < 5 && !messageSentFlag; i++) {
executeMessageSending();
}
return messageSentFlag;
}
private String convertMessageToJson(MessageList message) {
ObjectWriter ow = new ObjectMapper().writer()
.withDefaultPrettyPrinter();
String json = "";
try {
json = ow.writeValueAsString(message);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return json;
}
private void executeMessageSending() {
rabbitTemplate.convertAndSend(R.EXCHANGE_NAME, routingKey,
convertMessageToJson(message), new CorrelationData(UUID
.randomUUID().toString()));
}
private void preparingMessagingTemplate() {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode,
String replyText, String exchange, String routingKey) {
returnedMessage = replyText;
}
});
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack,
String cause) {
System.out.println("*" + ack);
if (ack && !returnedMessage.equalsIgnoreCase("NO_ROUTE")) {
messageSentFlag = ack;
log.info("message " + message.toString()
+ " from Operation +" + this.getClass().getName()
+ "+ has been successfully delivered");
} else {
log.info("message " + message.toString()
+ " from Operation +" + this.getClass().getName()
+ "+ has not been delivered");
}
}
});
}
}
我的配置類其用於通過消息傳遞
@Configuration
@ComponentScan("com.gatcbiotech.blueberry.*")
@PropertySource("classpath:application.properties")
public class MessageConfiguration {
String content = "";
@Value("${rabbitmq_host}")
String host = "";
String port = "";
@Value("${rabbitmq_username}")
String userName = "";
@Value("${rabbitmq_password}")
String password = "";
String queueName = "";
InputStream input = null;
@Autowired
public MessageConfiguration() {
}
@Bean
@Scope("prototype")
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
@Bean
@Scope("prototype")
public QueueCreationService service() {
return new QueueCreationService();
}
@Bean
@Scope("prototype")
public RabbitAdmin admin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
this.host);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
}
我的問題:
,我可以在服務器上看到一些線程是成功交付的郵件和其他不。
也完全沒rabbitTemplate監聽器(
rabbitTemplate.setReturnCallback(新ReturnCallback(){
我需要監聽的工作,因爲每次在這個基礎上我會嘗試發送消息的確定性再次
private boolean execute() {
for (int i = 0; i < 5 && !messageSentFlag; i++) {
executeMessageSending();
}
return messageSentFlag;
}
我可以看到有時消息正在交付5次,因爲messageSe ntFlag是錯誤的,只有在確認偵聽器中才會變爲true。
- 請告訴我如何刪除隊列?因爲我有8000個隊列,我在rabbitAdmin看到了一個刪除隊列的方法,但它需要隊列的名字,我的隊列只是任意的隊列(UUID)
請向我提供您的想法,我該如何改進或有任何解決方法? 對於我的應用程序來說,多線程環境是必須的。
在此先感謝。
我在這裏沒有看到任何東西,可以讓您檢測到無序發送的消息。我所能看到的是'MessageList'對象上的一個* major *線程安全問題。 – EJP
@EJP感謝您的回覆。你的意思是我應該使MessageList成爲Singelton而不是原型範圍? – Roxy
@EJP:郵件發生故障,只能在Rabbitmq接口上看到。 – Roxy