0

請先查看我的代碼。在多線程環境中沒有修復發送到Rabbitmq服務器的消息的順序環境

這是我的測試類這是創建2000線程和那些線程正在發送消息。

public class MessageSenderMultipleThreadMock { 
    @Autowired 
    MessageList message; 
    @Autowired 
    MessageSender sender; 

    public boolean process() throws InterruptedException { 

     for (int i = 0; i < 2000; i++) { 
      new Thread(new Runnable() { 

       @Override 
       public void run() { 

        String routingkey = "operation" 
          + UUID.randomUUID().toString(); 
        String queueName = UUID.randomUUID().toString(); 

        message.setSender(Thread.currentThread().getName()); 
        try { 
         sender.sendMessage(routingkey, queueName, 
           "this is message"); 
        } catch (InvalidMessagingParameters e) { 
         e.printStackTrace(); 
        } 

       } 
      }).start(); 
      Thread.sleep(1000); 

     } 
     Thread.currentThread(); 
     Thread.sleep(10000); 
     return true; 
    } 
} 

消息發送者

這是我的主消息發送者類

@Service 
public class MessageSender { 

    @Autowired 
    private RabbitTemplate rabbitTemplate; 
    @Autowired 
    private MessageList message; 
    String queueName = ""; 
    String routingKey = ""; 
    @Autowired 
    private QueueCreationService service; 
    private boolean messageSentFlag; 
    String returnedMessage = ""; 
    private Logger log = LoggerFactory.getLogger(MessageSender.class.getName()); 

    public boolean sendMessage(String routingKey, String queueName, 
      String messageToBeSent) throws InvalidMessagingParameters { 
     if ((routingKey == null && queueName == null) 
       || (routingKey.equalsIgnoreCase("") || queueName 
         .equalsIgnoreCase(""))) 
      throw new InvalidMessagingParameters(routingKey, queueName); 

     else { 
      this.routingKey = routingKey; 
      this.queueName = queueName; 
     } 
     service.processBinding(queueName, routingKey); 
     message.addMessages(messageToBeSent); 
     return execute(); 
    } 

    /* 
    * overloaded sendMessage method will use requestMap . RequestMap includes 
    * queueName and routingKey that controller provides. 
    */ 
    public boolean sendMessage(Map<String, String> requestMap) 
      throws MessagingConnectionFailsException, 
      InvalidMessagingParameters { 
     this.queueName = requestMap.get("queue"); 
     this.routingKey = requestMap.get("routingkey"); 
     if ((routingKey == null && queueName == null) 
       || (routingKey.equalsIgnoreCase("") || queueName 
         .equalsIgnoreCase(""))) 
      throw new InvalidMessagingParameters(routingKey, queueName); 
     service.processBinding(queueName, routingKey); 
     preparingMessagingTemplate(); 
     return execute(); 
    } 

    private boolean execute() { 
     for (int i = 0; i < 5 && !messageSentFlag; i++) { 
      executeMessageSending(); 
     } 
     return messageSentFlag; 
    } 

    private String convertMessageToJson(MessageList message) { 
     ObjectWriter ow = new ObjectMapper().writer() 
       .withDefaultPrettyPrinter(); 
     String json = ""; 
     try { 
      json = ow.writeValueAsString(message); 
     } catch (JsonProcessingException e) { 
      e.printStackTrace(); 
     } 
     return json; 
    } 

    private void executeMessageSending() { 
     rabbitTemplate.convertAndSend(R.EXCHANGE_NAME, routingKey, 
       convertMessageToJson(message), new CorrelationData(UUID 
         .randomUUID().toString())); 

    } 

    private void preparingMessagingTemplate() { 
     rabbitTemplate.setMandatory(true); 
     rabbitTemplate.setReturnCallback(new ReturnCallback() { 
      @Override 
      public void returnedMessage(Message message, int replyCode, 
        String replyText, String exchange, String routingKey) { 
       returnedMessage = replyText; 
      } 
     }); 
     rabbitTemplate.setConfirmCallback(new ConfirmCallback() { 
      @Override 
      public void confirm(CorrelationData correlationData, boolean ack, 
        String cause) { 
       System.out.println("*" + ack); 

       if (ack && !returnedMessage.equalsIgnoreCase("NO_ROUTE")) { 
        messageSentFlag = ack; 
        log.info("message " + message.toString() 
          + " from Operation +" + this.getClass().getName() 
          + "+ has been successfully delivered"); 
       } else { 
        log.info("message " + message.toString() 
          + " from Operation +" + this.getClass().getName() 
          + "+ has not been delivered"); 

       } 
      } 
     }); 
    } 
} 

我的配置類其用於通過消息傳遞

@Configuration 
    @ComponentScan("com.gatcbiotech.blueberry.*") 
    @PropertySource("classpath:application.properties") 

public class MessageConfiguration { 

    String content = ""; 
    @Value("${rabbitmq_host}") 
    String host = ""; 
    String port = ""; 
    @Value("${rabbitmq_username}") 
    String userName = ""; 
    @Value("${rabbitmq_password}") 
    String password = ""; 
    String queueName = ""; 
    InputStream input = null; 

    @Autowired 
    public MessageConfiguration() { 
    } 

    @Bean 
    @Scope("prototype") 
    public RabbitTemplate rabbitTemplate() { 
     RabbitTemplate template = new RabbitTemplate(connectionFactory()); 
     return template; 
    } 

    @Bean 
    @Scope("prototype") 
    public QueueCreationService service() { 
     return new QueueCreationService(); 
    } 

    @Bean 
    @Scope("prototype") 
    public RabbitAdmin admin() { 
     return new RabbitAdmin(connectionFactory()); 
    } 

    @Bean 
    public ConnectionFactory connectionFactory() { 
     CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
       this.host); 
     connectionFactory.setUsername(userName); 
     connectionFactory.setPassword(password); 
     connectionFactory.setPublisherConfirms(true); 
     connectionFactory.setPublisherReturns(true); 
     return connectionFactory; 
    } 

} 

我的問題:

  1. ,我可以在服務器上看到一些線程是成功交付的郵件和其他不。

  2. 也完全沒rabbitTemplate監聽器(

    rabbitTemplate.setReturnCallback(新ReturnCallback(){

我需要監聽的工作,因爲每次在這個基礎上我會嘗試發送消息的確定性再次

private boolean execute() { 
    for (int i = 0; i < 5 && !messageSentFlag; i++) { 
     executeMessageSending(); 
    } 
    return messageSentFlag; 
} 

我可以看到有時消息正在交付5次,因爲messageSe ntFlag是錯誤的,只有在確認偵聽器中才會變爲true。

  1. 請告訴我如何刪除隊列?因爲我有8000個隊列,我在rabbitAdmin看到了一個刪除隊列的方法,但它需要隊列的名字,我的隊列只是任意的隊列(UUID)

請向我提供您的想法,我該如何改進或有任何解決方法? 對於我的應用程序來說,多線程環境是必須的。

在此先感謝。

+1

我在這裏沒有看到任何東西,可以讓您檢測到無序發送的消息。我所能看到的是'MessageList'對象上的一個* major *線程安全問題。 – EJP

+0

@EJP感謝您的回覆。你的意思是我應該使MessageList成爲Singelton而不是原型範圍? – Roxy

+0

@EJP:郵件發生故障,只能在Rabbitmq接口上看到。 – Roxy

回答

2

一旦消息在特定的隊列中,RabbitMQ只保證消息順序。

除非您將這些保證置於正確的位置,否則不保證向RabbitMQ發送消息的消息順序。在許多情況下,這是一件困難的事,即使不是不可能的事情 - 特別是在像您這樣的多線程環境中。

如果你需要保證消息按照一定的順序進行處理,你需要看建築或使用resequencer

總的想法是,你需要在源頭上號碼您的信息 - 1,2, 3,4,5等。當消費者將消息從隊列中拉出時,您會查看消息編號,看看這是否是您現在需要的消息編號。如果不是的話,你會留下信息並稍後處理。一旦您收到目前正在查找的消息#,您將按順序處理您當前保存的所有消息。

春天應該有像resequencer可用的東西,雖然我不熟悉與該生態系統指向你在正確的方向。