0
我已經在RabbitMQ尚未新。我需要爲這些目的MOM系統:在RabbitMQ交易消費
- 發行消息被消耗,直到我的邏輯執行 成功。
- 代理不必從 隊列中刪除發佈的消息,直到我的邏輯成功執行。
對於這些目標,我寫了下面的代碼在第一次嘗試:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, "ProcessorOneTag",
new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)throws IOException
{
try
{
channel.txSelect();
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("Recieve Message is :" + new String(body));
int reslt = //execute my logic
if(result == 0)
channel.txCommit();
else
channel.txRollback();
}
catch(Throwable t)
{
t.printStackTrace();
}
}
});
通過這種方法,我實現了第二個目的,在別人的話,經紀人不會刪除我的消息,但有一次消耗了隊列中的所有消息,並且所有消息都被回滾,代理不會再向消費者發送消息。
在第二次嘗試,我寫了下面的代碼:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, "ProcessorOneTag",
new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)throws IOException
{
try
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("Recieve Message is :" + new String(body));
int reslt = //execute my logic
if(result == 0)
channel.basicAck(deliveryTag, false);
else
channel.basicNack(deliveryTag,false,true);
}
catch(Throwable t)
{
t.printStackTrace();
}
}
});
通過該解決方案,我要實現這兩個目標,但我不知道我的代碼是否正確?這種方法是否會導致TPS高的生產環境出現問題?我不知道requeue標記basicNack方法是重還是輕?
我已經使用basicAck和basicNack的時間很長,即使高TPS也沒有問題。 –