2017-08-11 36 views
0

這裏是我的代碼:如何在RabbitMQ中使用不同的路由密鑰將多條消息發佈到單個隊列?

package pushnotiruntest; 

import com.rabbitmq.client.BuiltinExchangeType; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 
import java.io.IOException; 
import java.util.concurrent.TimeoutException; 
import java.util.logging.Level; 
import java.util.logging.Logger; 

public class Send extends Thread { 

    String name = ""; 
    String app_type = ""; 
    private static final String EXCHANGE_NAME = "topic_exchange"; 

    public void run() 
    { 
     ConnectionFactory connFac = new ConnectionFactory(); 
     connFac.setHost("localhost"); 

     try { 

       Connection conn = connFac.newConnection(); 
       Channel channel = conn.createChannel(); 
       channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); 
       for(int j=1; j<=20000; j++) 
       { 
        //randomWait(); 

        String routingKey = j+"."+"update"+"."+app_type; 
        String msg = name; 
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes("UTF-8")); 
        System.out.println("Sent " + routingKey + " : " + msg + ""); 
       } 

       channel.close(); 
       conn.close(); 

     } catch (IOException ex) { 
      Logger.getLogger(Send.class.getName()).log(Level.SEVERE, null, ex); 
      System.out.println("Exception1 :--"+ex); 

     } catch (TimeoutException ex) { 
      Logger.getLogger(Send.class.getName()).log(Level.SEVERE, null, ex); 
      System.out.println("Exception 2:--"+ex); 
     } 
    } 

    /*void randomWait() 
    { 
     try { 
      Thread.currentThread().sleep((long)(3*Math.random())); 
     } catch (InterruptedException x) { 
      System.out.println("Interrupted!"); 
     } 
    }*/ 

    public static void main(String[] args) { 
     // TODO code application logic here 

     Send test1 = new Send(); 
     test1.name = "Hello ANDROID"; 
     test1.app_type = "ANDROID"; 
     Send test2 = new Send(); 
     test2.name = "Hello IOS"; 
     test2.app_type = "IOS"; 
     Send test3 = new Send(); 
     test3.name = "Hello WINDOWS"; 
     test3.app_type = "WINDOWS"; 

     test1.start(); 
     test2.start();   
     test3.start(); 
    } 
} 

//javac -cp amqp-client-4.0.2.jar Send.java Recv.java 

//java -cp .;amqp-client-4.0.2.jar;slf4j-api-1.7.21.jar;slf4j-simple-1.7.22.jar Recv 

//java -cp .;amqp-client-4.0.2.jar;slf4j-api-1.7.21.jar;slf4j-simple-1.7.22.jar 
Send 

我用Java編寫代碼(使用消息代理是RabbitMQ的),我想存儲的信息由生產者在一個隊列不同的路由發送鍵。

並且針對與路由密鑰模式相匹配的不同消費者 針對那裏模式獲取該消息。 (我正在使用Topic交換進行模式匹配)。

回答

1

如果你需要兩個消費者,你必須使用兩個隊列。 綁定如果從exchangequeue(s),則無法在訂閱期間決定路由密鑰。

您可以將更多路由密鑰綁定到同一隊列,但不能使用密鑰進行過濾。

我想你需要的東西,如:

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); 
channel.queueDeclare(QUEUE_NAME_1, true, false, false, null); 
channel.queueDeclare(QUEUE_NAME_2, true, false, false, null); 
channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "my.rk.1"); 
channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "my.rk.2"); 
channel_consumer_1.basicConsume(QUEUE_NAME_1, false, new DefaultConsumer(channel_consumer) {...} 
.... 
channel_consumer_2.basicConsume(QUEUE_NAME_2, false, new DefaultConsumer(channel_consumer) {...} 
+0

如果我要發送數以千計的郵件到不同的不同的應用程序(如推送通知),以便創建隊列爲每個可能創建隊列數多,這可能難由服務器處理。我想要的是以優先級將消息發佈到隊列中,並且還將消息分別返回給它們的消費者(即根據應用ID)。 –

+0

你必須創建一個隊列設備 – Gabriele

+0

可以請你解釋 –

相關問題