2016-05-12 202 views
0

嗨我想通過rabbitMQ發送消息。我宣佈直接交換並將其路由到我在生產者隊列的隊列中,但我無法在消費者處收到消息。以下是我的消費代碼:RabbitMQ消費者沒有收到生產者發送的消息

package com.rabbit.consumer; 

import java.io.IOException; 
import java.util.concurrent.TimeoutException; 

import com.rabbitmq.client.*; 

public class Consumer { 

private final static String QUEUE_NAME = "credit1"; 
public static void main(String[] args) throws IOException, TimeoutException { 
     boolean autoAck = false; 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     final Channel channel = connection.createChannel(); 
     //channel.queueDeclare(QUEUE_NAME, false, false, false, null); 
     System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 
     /* DefaultConsumer consumer = new DefaultConsumer(channel) { 
      @Override 
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 
       throws IOException { 
       String message = new String(body, "UTF-8"); 
       System.out.println(" [x] Received '" + message + "'"); 
      } 
      }; 
      channel.basicConsume(QUEUE_NAME, true, consumer);*/ 
     channel.basicConsume(QUEUE_NAME, autoAck, "myConsumerTag", 
      new DefaultConsumer(channel) { 
       @Override 
       public void handleDelivery(String consumerTag, 
              Envelope envelope, 
              AMQP.BasicProperties properties, 
              byte[] body) 
        throws IOException 
       { 
        String routingKey = envelope.getRoutingKey(); 
        String contentType = properties.getContentType(); 
        long deliveryTag = envelope.getDeliveryTag(); 
        // (process the message components here ...) 
        String message = new String(body, "UTF-8"); 
        System.out.println(" [x] Received '" + message + "'" + "Routing Key : "+ routingKey); 
        channel.basicAck(deliveryTag, false); 
       } 

       @Override 
       public void handleShutdownSignal(String consumerTag, 
         ShutdownSignalException sig) { 
        // TODO Auto-generated method stub 
        System.out.println("Recieving Ended"); 
        System.out.println("Channel Closed : " + sig.getMessage()); 
       } 
      }); 

} 
} 

和我的製片代碼如下:

package com.rabbit.producer; 

import java.io.IOException; 
import java.util.concurrent.TimeoutException; 

import com.rabbitmq.client.AMQP.Exchange; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.MessageProperties; 
import com.rabbitmq.client.ReturnListener; 
import com.rabbitmq.client.AMQP.BasicProperties; 



public class Producer { 
    private static final String QUEUE_NAME = "credit1"; 

    public static void main(String[] args) throws IOException, TimeoutException { 
     ConnectionFactory connectionFactory = new ConnectionFactory(); 
     connectionFactory.setHost("localhost"); 
     Connection connection = connectionFactory.newConnection(); 

     Channel channel = connection.createChannel(); 
     channel.exchangeDeclare("offerExchange", "direct", true); 
     channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
     channel.queueBind(QUEUE_NAME, "offerExchange", "credit"); 
     channel.addReturnListener(new ReturnListener() { 

      public void handleReturn(int replyCode, 
        String replyText, 
        String exchange, 
        String routingKey, 
        BasicProperties properties, 
        byte[] body) throws IOException { 
       // TODO Auto-generated method stub 
       System.out.println("Message Failed" + replyText); 
      } 
     }); 
     String message = "Hello rabbit just hop !"; 
    /* for(int i=1;i<=100000;i++){*/ 
     channel.basicPublish("offerExchange", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); 
    /* }*/ 
     System.out.println(" [x] Sent '" + message + "'"); 
     channel.close(); 
     connection.close(); 
    } 
} 

回答

0

好,我得到的答案錯誤是在這一行

channel.queueBind(QUEUE_NAME, "offerExchange", "credit"); 

我聲明隊列信用1並且對信用具有約束力。愚蠢的錯誤。