2014-10-03 82 views
1

我使用RabbitMQ來連接部件之間的程序。 RMQ版本(3.3.5)。它與回購的java客戶端一起使用。RabbitMQ訂閱

// Connection part 
@Inject 
public AMQService(RabbitMQConfig mqConfig) throws IOException { 
    this.mqConfig = mqConfig; 
    connectionFactory.setHost(mqConfig.getRABBIT_HOST()); 
    connectionFactory.setUsername(mqConfig.getRABBIT_USERNAME()); 
    connectionFactory.setPassword(mqConfig.getRABBIT_PASSWORD()); 
    connectionFactory.setAutomaticRecoveryEnabled(true); 
    connectionFactory.setPort(mqConfig.getRABBIT_PORT()); 
    connectionFactory.setVirtualHost(mqConfig.getRABBIT_VHOST()); 
    Connection connection = connectionFactory.newConnection(); 
    channel = connection.createChannel(); 
    channel.basicQos(1); 
} 

//Consume part 
private static void consumeResultQueue() { 
    final QueueingConsumer consumer = new QueueingConsumer(channel); 
    Future resultQueue = EXECUTOR_SERVICE.submit((Callable<Object>)() -> { 
     channel.basicConsume("resultQueue", true, consumer); 
     while (true) { 
      try { 
       QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
       String message = new String(delivery.getBody(), "UTF-8"); 
       resultListener.onMessage(message); 
      } catch (IOException | InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    }); 
} 

我想離開使用inifinty循環。當消息可以從隊列中讀取時,RMQ可以通知客戶端嗎?沒有檢查?

回答

2

您可以創建一個延伸DefaultConsumer並覆蓋handleDelivery的類。

public class MyConsumer extends DefaultConsumer { 

    public MyConsumer(Channel channel) { 
     super(channel); 
    } 

    @Override 
    public void handleDelivery(String consumerTag, Envelope envelope, 
          AMQP.BasicProperties properties, byte[] body) throws IOException { 
     // do your computation 
    } 
} 

並註冊該消費者與channel.basicConsume(queueName, myConsumerInstance);

注意,通過這樣做,handleDelivery將RabbitMQ的客戶端線程池裏面運行,所以你應該避免這個函數內的任何長的計算。

+0

嗯有趣。我在測試時迴應。謝謝 – 2014-10-03 12:25:42