2013-06-13 226 views
2

我是jms的新手。我們的目標是通過將偵聽器實例附加到多個消費者,同時每個消費者使用它自己的會話並在單獨的線程中運行,從而在一個異步偵聽器的onMessage方法中從一個隊列併發處理消息,這樣消息就被傳遞給不同的消費者併發處理。單隊列:多個消費者同時處理消息

1)是否可以通過創建多個使用者來同時處理來自單個隊列的消息? 2)我想出了下面的代碼,但想知道下面的代碼是否看起來正確,我想完成什麼。

public class QueueConsumer implements Runnable, MessageListener { 

public static void main(String[] args) { 




    QueueConsumer consumer1 = new QueueConsumer(); 
    QueueConsumer consumer2 = new QueueConsumer(); 
    try { 
     consumer1.init("oms", "US.Q.CHECKOUT-ORDER.1.0.JSON"); 
     consumer2.init("oms","US.Q.CHECKOUT-ORDER.1.0.JSON"); 
    } catch (JMSException ex) { 
     ex.printStackTrace(); 
     System.exit(-1); 
    } 


    Thread newThread1 = new Thread(consumer1); 
    Thread newThread2 = new Thread(consumer1); 
    newThread1.start(); 
    newThread2.start(); 



} 


private static String connectionFactoryName = null; 
private static String queueName = null; 


private static ConnectionFactory qcf = null; 
private static Connection queueConnection = null; 


private Session ses = null; 
private Destination queue = null; 
private MessageConsumer msgConsumer = null; 

public static final Logger logger = LoggerFactory 
     .getLogger(QueueConsumer.class); 

public QueueConsumer() { 
    super(); 
} 

public void onMessage(Message msg) { 
    if (msg instanceof TextMessage) { 
     try { 

      //process message 

     } catch (JMSException ex) { 
      ex.printStackTrace(); 

     } 
    } 

} 

public void run() { 

    try { 
     queueConnection.start(); 
    } catch (JMSException e) { 

     e.printStackTrace(); 

     System.exit(-1); 
    } 
    while (!Thread.currentThread().isInterrupted()) { 
     synchronized (this) { 
      try { 
       wait(); 
      } catch (InterruptedException ex) { 
       break; 
      } 
     } 
    } 

} 



public void init(String factoryName, String queue2) throws JMSException { 
    try { 

     qcf = new JMSConnectionFactory(factoryName); 


     queueConnection = qcf.createConnection(); 


     ses = queueConnection.createSession(false, 
       Session.CLIENT_ACKNOWLEDGE); 
     queue = ses.createQueue(queue2); 
     logger.info("Subscribing to destination: " + queue2); 

     msgConsumer = ses.createConsumer(queue); 


     msgConsumer.setMessageListener(this); 

     System.out.println("Listening on queue " + queue2); 

    } catch (Exception e) { 
     e.printStackTrace(); 
     System.exit(-1); 
    } 

} 

private static void setConnectionFactoryName(String name) { 
    connectionFactoryName = name; 
} 

private static String getQueueName() { 
    return queueName; 
} 

private static void setQueueName(String name) { 
    queueName = name; 
} 

}

回答

1
  1. 是絕對
  2. 我只用了一個簡單的介紹一下,我注意到,你傳遞了錯誤的消費者到你的第二個線程:

    Thread newThread2 = new Thread(consumer1); // has to pass consumer2 
    

    這旁邊,一些變量如ConnectionFactory是靜態的並且被多次/重寫初始化。您只需要一個可以創建多個會話和/或消費者的連接。

+0

因此,如果我在main方法本身中創建connectionfactory和連接一次,然後爲消費者創建會話,consumer,listener一次,那麼我猜這就是您所指的? – user2221654

+0

是的,你也應該能夠分享會話,相關:http://stackoverflow.com/questions/4741713/relationship-between-jms-connections-sessions-and-producers-consumers – Dag

+0

因爲會話中的消息是如果我從相同的會話實例創建消費者而不是不同的會話實例,那麼這些消息不會被同時處理,只有一個消費者每次都會得到一條消息,因爲它們屬於同一個會話實例? – user2221654

相關問題