我有一個多線程的彈簧應用程序,我在這裏創建主題交換,聲明隊列,用路由鍵綁定它們。同步發送和接收消息。我能夠將消息發送到話題交換,並查看使用routingKey將消息發佈到隊列中。Rabbit MQ同步在多線程應用程序中發送和接收
但是,在收到消息時,我看到消費者在每次迭代中都註冊了隊列,並且沒有取消註冊。我正在創建QueueingConsumer接收郵件,可能有另一種方法可以做同樣的事情,請讓我知道。下面是receiveMessage方法的片段。
public ObjectMessage receiveMessage(final String readQueue, final UUID correlationId, final boolean isBroadcastMessage, final int readTimeout, final int readAttempts)
{
this.configurationLock.lock();
this.transmissionSemaphore.release(1);
this.configurationLock.unlock();
try
{
for (int i = 0; i < readAttempts; i++)
{
ObjectMessage returnValue = null;
try
{
returnValue = this.receiveMessage(readQueue, correlationId, isBroadcastMessage, readTimeout);
}
catch (final Exception e)
{
logger.error(e);
}
if (returnValue != null)
{
logger.warn("Message received from queue - " + readQueue);
return returnValue;
}
}
if (correlationId != null)
{
throw new MessageNotFoundException(correlationId);
}
return null;
}
finally
{
try
{
this.transmissionSemaphore.acquire(1);
}
catch (final InterruptedException e)
{
Thread.interrupted();
}
}
}
private ObjectMessage receiveMessage(final String routingKey, final UUID correlationId, final boolean isBroadcastMessage, final int readTimeout) throws Exception
{
logger.debug("receiveMessage - routingKey:" + routingKey + ",correlationId:" + correlationId + ",isBroadcastMessage:" + isBroadcastMessage + ",readTimeout:"
+ readTimeout);
this.configurationLock.lock();
this.transmissionSemaphore.release(1);
this.configurationLock.unlock();
Connection connection = null;
Channel channel = null;
QueueingConsumer consumer = null;
try
{
// Binding the topic exchange with queue using routing key
final String queueName = "clientConfigurationQueue";
final CachingConnectionFactory cachingConnectionFactory = this.getCachingConnectionFactory(routingKey);
if (isBroadcastMessage)
{
this.declareTopicAmqpInfrastructure(cachingConnectionFactory, routingKey, queueName);
}
QueueingConsumer.Delivery delivery;
connection = cachingConnectionFactory.createConnection();
channel = connection.createChannel(false);
consumer = new QueueingConsumer(channel);
if (correlationId == null)
{
channel.basicConsume(queueName, true, consumer);
delivery = consumer.nextDelivery(readTimeout);
}
else
{
channel.basicConsume(queueName, false, consumer);
while (true)
{
delivery = consumer.nextDelivery(readTimeout);
if (delivery != null)
{
final String correlationId = delivery.getProperties().getCorrelationId();
if (correlationId.equals(correlationId))
{
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
break;
}
else
{
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
}
else
{
break;
}
}
}
ObjectMessage objectMessage = null;
if (delivery != null)
{
logger.debug("Message received with correlationId - " + delivery.getProperties().getCorrelationId() + " for queue - " + queueName);
logger.debug("Message received with Body - " + SerializationUtils.deserialize(delivery.getBody()));
objectMessage = new ObjectMessage();
objectMessage.setCorrelationId(delivery.getProperties().getCorrelationId());
objectMessage.setMessage(delivery.getBody());
}
else
{
logger.debug("Message not received from queueName - " + queueName);
}
return objectMessage;
}
catch (final IOException | ShutdownSignalException | ConsumerCancelledException | InterruptedException e)
{
logger.error("Unable to receive message - " + e);
throw new Exception(e);
}
finally
{
try
{
this.transmissionSemaphore.acquire(1);
}
catch (final InterruptedException e)
{
Thread.interrupted();
}
try
{
if (connection != null)
{
connection.close();
}
if (channel != null)
{
channel.close();
}
}
catch (final Exception ignore)
{
}
}
}
private void declareTopicAmqpInfrastructure(final CachingConnectionFactory cachingConnectionFactory, final String routingKey, String queueName)
{
final Connection connection = cachingConnectionFactory.createConnection();
final Channel channel = connection.createChannel(false);
try
{
channel.exchangeDeclare("topicExchange", ExchangeTypes.TOPIC, true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, "topicExchange", routingKey);
}
catch (final IOException e)
{
logger.error("Unable to declare rabbit queue, exchange and binding - " + e);
}
finally
{
connection.close();
try
{
channel.close();
}
catch (final IOException ignore)
{
}
}
}
你所描述的對我來說毫無意義;請提供更多詳細信息(堆棧跟蹤等)。如果您可以發佈重現該應用的示例應用(例如,在Gist上),那會更好。 –
提供有關示例代碼的更多詳細信息 – GRaj