2014-06-18 66 views
0

我有一個多線程的彈簧應用程序,我在這裏創建主題交換,聲明隊列,用路由鍵綁定它們。同步發送和接收消息。我能夠將消息發送到話題交換,並查看使用routingKey將消息發佈到隊列中。Rabbit MQ同步在多線程應用程序中發送和接收

但是,在收到消息時,我看到消費者在每次迭代中都註冊了隊列,並且沒有取消註冊。我正在創建QueueingConsumer接收郵件,可能有另一種方法可以做同樣的事情,請讓我知道。下面是receiveMessage方法的片段。

public ObjectMessage receiveMessage(final String readQueue, final UUID correlationId, final boolean isBroadcastMessage, final int readTimeout, final int readAttempts) 
{ 
    this.configurationLock.lock(); 
    this.transmissionSemaphore.release(1); 
    this.configurationLock.unlock(); 
    try 
    { 
     for (int i = 0; i < readAttempts; i++) 
     { 
      ObjectMessage returnValue = null; 
      try 
      { 
       returnValue = this.receiveMessage(readQueue, correlationId, isBroadcastMessage, readTimeout); 
      } 
      catch (final Exception e) 
      { 
       logger.error(e); 
      } 
      if (returnValue != null) 
      { 
       logger.warn("Message received from queue - " + readQueue); 
       return returnValue; 
      } 
     } 
     if (correlationId != null) 
     { 
      throw new MessageNotFoundException(correlationId); 
     } 
     return null; 
    } 
    finally 
    { 
     try 
     { 
      this.transmissionSemaphore.acquire(1); 
     } 
     catch (final InterruptedException e) 
     { 
      Thread.interrupted(); 
     } 
    } 
} 


private ObjectMessage receiveMessage(final String routingKey, final UUID correlationId, final boolean isBroadcastMessage, final int readTimeout) throws Exception 
{ 
    logger.debug("receiveMessage - routingKey:" + routingKey + ",correlationId:" + correlationId + ",isBroadcastMessage:" + isBroadcastMessage + ",readTimeout:" 
      + readTimeout); 
    this.configurationLock.lock(); 
    this.transmissionSemaphore.release(1); 
    this.configurationLock.unlock(); 

    Connection connection = null; 
    Channel channel = null; 
    QueueingConsumer consumer = null; 
    try 
    { 
     // Binding the topic exchange with queue using routing key 
     final String queueName = "clientConfigurationQueue"; 
     final CachingConnectionFactory cachingConnectionFactory = this.getCachingConnectionFactory(routingKey); 
     if (isBroadcastMessage) 
     { 
      this.declareTopicAmqpInfrastructure(cachingConnectionFactory, routingKey, queueName); 
     } 
     QueueingConsumer.Delivery delivery; 

     connection = cachingConnectionFactory.createConnection(); 
     channel = connection.createChannel(false); 

     consumer = new QueueingConsumer(channel); 

     if (correlationId == null) 
     { 
      channel.basicConsume(queueName, true, consumer); 
      delivery = consumer.nextDelivery(readTimeout); 
     } 
     else 
     { 
      channel.basicConsume(queueName, false, consumer); 
      while (true) 
      { 
       delivery = consumer.nextDelivery(readTimeout); 
       if (delivery != null) 
       { 
        final String correlationId = delivery.getProperties().getCorrelationId(); 

        if (correlationId.equals(correlationId)) 
        { 
         channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
         break; 
        } 
        else 
        { 
         channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true); 
        } 
       } 
       else 
       { 
        break; 
       } 
      } 
     } 

     ObjectMessage objectMessage = null; 
     if (delivery != null) 
     { 
      logger.debug("Message received with correlationId - " + delivery.getProperties().getCorrelationId() + " for queue - " + queueName); 
      logger.debug("Message received with Body - " + SerializationUtils.deserialize(delivery.getBody())); 
      objectMessage = new ObjectMessage(); 
      objectMessage.setCorrelationId(delivery.getProperties().getCorrelationId()); 
      objectMessage.setMessage(delivery.getBody()); 
     } 
     else 
     { 
      logger.debug("Message not received from queueName - " + queueName); 
     } 

     return objectMessage; 
    } 
    catch (final IOException | ShutdownSignalException | ConsumerCancelledException | InterruptedException e) 
    { 
     logger.error("Unable to receive message - " + e); 
     throw new Exception(e); 
    } 
    finally 
    { 
     try 
     { 
      this.transmissionSemaphore.acquire(1); 
     } 
     catch (final InterruptedException e) 
     { 
      Thread.interrupted(); 
     } 

     try 
     { 
      if (connection != null) 
      { 
       connection.close(); 
      } 

      if (channel != null) 
      { 
       channel.close(); 
      } 
     } 
     catch (final Exception ignore) 
     { 

     } 
    } 
} 

private void declareTopicAmqpInfrastructure(final CachingConnectionFactory cachingConnectionFactory, final String routingKey, String queueName) 
{ 
    final Connection connection = cachingConnectionFactory.createConnection(); 
    final Channel channel = connection.createChannel(false); 
    try 
    { 
     channel.exchangeDeclare("topicExchange", ExchangeTypes.TOPIC, true, false, null); 
     channel.queueDeclare(queueName, true, false, false, null); 
     channel.queueBind(queueName, "topicExchange", routingKey); 
    } 
    catch (final IOException e) 
    { 
     logger.error("Unable to declare rabbit queue, exchange and binding - " + e); 
    } 
    finally 
    { 
     connection.close(); 
     try 
     { 
      channel.close(); 
     } 
     catch (final IOException ignore) 
     { 

     } 
    } 
} 
+1

你所描述的對我來說毫無意義;請提供更多詳細信息(堆棧跟蹤等)。如果您可以發佈重現該應用的示例應用(例如,在Gist上),那會更好。 –

+0

提供有關示例代碼的更多詳細信息 – GRaj

回答

0

通過您的編輯,您完全改變了您的問題;你原來的問題說你掛在createConnection()。如果你使用Spring AMQP,你爲什麼不使用它的更高級別的抽象?您永遠不會取消您的消費者 - 您需要跟蹤basicConsume返回的consumerTag,並在完成後使用basicCancel取消。

+0

對於完全改變問題感到抱歉,因爲我幾乎無法理解與主題和扇出交換相關的AMQP概念,因此產生了一些混亂。 通過更高級別的抽象,你的意思是使用RabbitTemplate發送和接收方法? – GRaj

+0

是的,和異步接收的消息偵聽器容器。 –

+0

消費者名單立即下降。非常感謝Gary。你一直是一位偉大的導師。 – GRaj

相關問題