2017-08-01 71 views
0

我已經在RabbitMQ尚未新。我需要爲這些目的MOM系統:在RabbitMQ交易消費

  1. 發行消息被消耗,直到我的邏輯執行 成功。
  2. 代理不必從 隊列中刪除發佈的消息,直到我的邏輯成功執行。

對於這些目標,我寫了下面的代碼在第一次嘗試:

ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost"); 
    Connection connection = factory.newConnection(); 
    final Channel channel = connection.createChannel(); 
    channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
    boolean autoAck = false; 
    channel.basicConsume(QUEUE_NAME, autoAck, "ProcessorOneTag", 
      new DefaultConsumer(channel) 
      { 
       @Override 
       public void handleDelivery(String consumerTag, 
              Envelope envelope, 
              AMQP.BasicProperties properties, 
              byte[] body)throws IOException 
       { 
        try 
        { 
         channel.txSelect(); 
         String routingKey = envelope.getRoutingKey(); 
         String contentType = properties.getContentType(); 
         long deliveryTag = envelope.getDeliveryTag(); 
         System.out.println("Recieve Message is :" + new String(body)); 
         int reslt = //execute my logic 
         if(result == 0) 
          channel.txCommit(); 
         else 
          channel.txRollback(); 
        } 
        catch(Throwable t) 
        { 
         t.printStackTrace(); 
        } 
       } 
      }); 

通過這種方法,我實現了第二個目的,在別人的話,經紀人不會刪除我的消息,但有一次消耗了隊列中的所有消息,並且所有消息都被回滾,代理不會再向消費者發送消息。

在第二次嘗試,我寫了下面的代碼:

ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost"); 
    Connection connection = factory.newConnection(); 
    final Channel channel = connection.createChannel(); 
    channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
    boolean autoAck = false; 
    channel.basicConsume(QUEUE_NAME, autoAck, "ProcessorOneTag", 
      new DefaultConsumer(channel) 
      { 
       @Override 
       public void handleDelivery(String consumerTag, 
              Envelope envelope, 
              AMQP.BasicProperties properties, 
              byte[] body)throws IOException 
       { 
        try 
        { 
         String routingKey = envelope.getRoutingKey(); 
         String contentType = properties.getContentType(); 
         long deliveryTag = envelope.getDeliveryTag(); 
         System.out.println("Recieve Message is :" + new String(body)); 
         int reslt = //execute my logic 
         if(result == 0) 
          channel.basicAck(deliveryTag, false); 
         else 
          channel.basicNack(deliveryTag,false,true); 
        } 
        catch(Throwable t) 
        { 
         t.printStackTrace(); 
        } 
       } 
      }); 

通過該解決方案,我要實現這兩個目標,但我不知道我的代碼是否正確?這種方法是否會導致TPS高的生產環境出現問題?我不知道requeue標記basicNack方法是重還是輕?

+2

我已經使用basicAck和basicNack的時間很長,即使高TPS也沒有問題。 –

回答

0

我幾個月前有同樣的要求。我經歷了許多解決方案,這是對我有用的。

我在內存中的某個地方存儲了交付標籤,如果我的邏輯進展順利,我手動確認消息或拒絕該消息。爲此我使用了下面的方法。

if (success) 
connectionModel.BasicAck(Convert.ToUInt64(uTag), false); 
else 
connectionModel.BasicReject(Convert.ToUInt64(uTag), true); 

上面的代碼工作正常,以5000msg /秒的速度生產。那麼,basicNack方法是造成我的問題。