2016-09-11 79 views
0

RabbitMQ隊列中的消息優先級。它與rabbitmq提供的java客戶端一起工作。但它不適用於spring-rabbit依賴。請看一看。Spring AMQP Priority Message

  • RabbitMQ的服務器版本 - 3.6.5
  • 二郎山 - OTP 19(8.0)

使用RabbitMQ的Java客戶端
的pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 
    <groupId>org.springframework.samples</groupId> 
    <artifactId>RabbitMQ</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 

    <developers> 
     <developer> 
      <name>Sagar Rout</name> 
     </developer> 
    </developers> 

    <properties> 
     <!-- Generic properties --> 
     <java.version>1.8</java.version> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 

     <!-- Spring --> 
     <spring-framework.version>4.3.2.RELEASE</spring-framework.version> 
    </properties> 

    <dependencies> 
     <!-- Spring --> 
     <dependency> 
      <groupId>org.springframework</groupId> 
      <artifactId>spring-context</artifactId> 
      <version>${spring-framework.version}</version> 
     </dependency> 

     <!-- Spring AMQP --> 
     <dependency> 
      <groupId>org.springframework.amqp</groupId> 
      <artifactId>spring-rabbit</artifactId> 
      <version>1.6.1.RELEASE</version> 
     </dependency> 
    </dependencies> 

    <build> 
     <plugins> 
      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-compiler-plugin</artifactId> 
       <version>3.5.1</version> 
       <configuration> 
        <source>1.8</source> 
        <target>1.8</target> 
       </configuration> 
      </plugin> 
     </plugins> 
    </build> 

</project> 

出版商。 java

public class Publisher { 

private final static String QUEUE_NAME = "S1_Priority"; 

public static void main(String[] argv) throws Exception { 
    ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost"); 
    Connection connection = factory.newConnection(); 
    Channel channel = connection.createChannel(); 
    Map<String, Object> args = new HashMap<String, Object>(); 
    args.put("x-max-priority", 10); 
    channel.queueDeclare(QUEUE_NAME, false, false, false, args); 
    String message = "Hello World!"; 

    for (int i = 0; i < 10; i++) { 
     channel.basicPublish("", QUEUE_NAME, 
       new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2).priority(i).build(), 
       message.getBytes("UTF-8")); 
     System.out.println(" [x] Sent '" + message + "'" + "priority" + i); 
    } 
    channel.close(); 
    connection.close(); 
}} 

Consumer.Java

public class Consumer { 

private final static String QUEUE_NAME = "S1_Priority"; 

public static void main(String[] argv) throws Exception { 
    ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost"); 
    Connection connection = factory.newConnection(); 
    Channel channel = connection.createChannel(); 
    Map<String, Object> args = new HashMap<String, Object>(); 
    args.put("x-max-priority", 10); 
    channel.queueDeclare(QUEUE_NAME, false, false, false, args); 
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 

    DefaultConsumer consumer = new DefaultConsumer(channel) { 
     @Override 
     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 
       byte[] body) throws IOException { 
      String message = new String(body, "UTF-8"); 
      System.out.println(" [x] Received '" + message + "'" + properties.getPriority()); 
     } 
    }; 
    channel.basicConsume(QUEUE_NAME, true, consumer); 
}} 

這是工作和具有較高優先級的消息來了。但它不適用於Spring-rabbit。請找到代碼。
RabbitMQConfig.class

@Configuration 
@ComponentScan(basePackages = { "com.blackocean.*" }) 
@PropertySource("classpath:config.properties") 
public class RabbitMQConfig { 

@Value("${rabbitmq.host}") 
private String host; 

@Value("${rabbitmq.port}") 
private Integer port; 

@Value("${rabbitmq.username}") 
private String username; 

@Value("${rabbitmq.password}") 
private String password; 

@Value("${rabbitmq.connection.size}") 
private Integer connectionSize ; 

@Bean 
public static PropertySourcesPlaceholderConfigurer propertyConfigInDev() { 
    return new PropertySourcesPlaceholderConfigurer(); 
} 

@Bean 
public ConnectionFactory connectionFactory() { 

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); 
    cachingConnectionFactory.setHost(host); 
    cachingConnectionFactory.setPort(port); 
    cachingConnectionFactory.setUsername(username); 
    cachingConnectionFactory.setPassword(password); 
    cachingConnectionFactory.setConnectionLimit(connectionSize); 

    return cachingConnectionFactory; 
} 

@Bean 
public RabbitAdmin rabbitAdmin() { 
    return new RabbitAdmin(connectionFactory()); 
} 

@Bean 
public RabbitTemplate rabbitTemplate() { 
    return new RabbitTemplate(connectionFactory()); 
} 

@Bean 
public Queue queue() { 
    Map<String, Object> args = new HashMap<String, Object>(); 
    args.put("x-priority", 10); 
    Queue queue = new Queue("myQueue", true, false, false, args) ; 
    return queue ; 
}} 

SendUsingJavaConfig

public class Send1UsingJavaConfig { 

/** 
* @param args 
*/ 
public static void main(String[] args) { 

    ApplicationContext context = new AnnotationConfigApplicationContext(RabbitMQConfig.class); 
    RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); 

     rabbitTemplate.convertAndSend("", "myQueue", "Hi Mr.Ocean 10", new MessagePostProcessor() { 

      @Override 
      public Message postProcessMessage(Message message) throws AmqpException { 
       message.getMessageProperties().setPriority(9); 
       return message; 
      } 
     }); 
    } 
} 

ReceiveusingJavaConfig

public class RecvUsingJavaConfig { 

public static void main(String[] args) { 
    ApplicationContext context = new AnnotationConfigApplicationContext(RabbitMQConfig.class); 
    RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); 

    // Basic Example 
    String message = (String) rabbitTemplate.receiveAndConvert("myQueue"); 
    System.out.println(message); 
}} 

Config.propertie小號

#RabbitMQ 
rabbitmq.host=localhost 
#Always provide port and connection size in numbers 
rabbitmq.port=5672 
rabbitmq.username=guest 
rabbitmq.password=guest 
rabbitmq.connection.size=100 

現在我發送消息與不同的優先級,但它始終按以下順序接收消息。任何建議將是偉大的!

回答

3

這裏只是一個猜測,我試圖尋找我曾經使用過的舊AMQP庫(優先隊列在老版本的Rabbit MQ中)。

優先級設定爲低於

args.put("x-max-priority", 10);,它看起來從args.put("x-priority", 10);略有不同。

您可以參考鏈接中舊的priority queue repo。你可以嘗試看看是否有幫助

+1

是的;在本地代碼中,你有'args.put(「x-max-priority」,10);'但是在Spring代碼中,你錯誤地使用了'args.put(「x-priority」,10);'。 –

+0

謝謝你們。我昨天嘗試了x-max-priority,但沒有成功。今天,當我改變它現在工作。我不知道當時有什麼遺漏。謝啦。 :) – blackOcean