2016-10-19 73 views
1

Iam使用ActiveMQ編寫應用程序,其中iam使用異步onMessage()方法從ActiveMQ獲取消息。 假設我從activemq獲得1000條消息,所有消息都將存儲在OnMessage()方法中的ConcurrentLinkedQueue中,並使用一個線程從ConcurrentLinkedQueue中檢索。 但是iam面臨的問題是iam無法添加或修改ConcurrentLinkedQueue中的單個消息,並且onMessage()中的textMessage發送給一個setText方法,該方法需要一個textMessage,但iam無法從getter方法獲取任何內容。爲什麼這?如何避免這種情況?ActiveMQ OnMessage()方法阻止其他線程

的代碼片段是如下

public static void main(String[] args) throws InterruptedException, JMSException { 

//Create a producer 
     Thread producer = new Thread(new Producer(queue,settext)); 
     producer.start(); 
//Create a Consumer with coresize 4 and Max size 10 
     final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy()); 
     executor.allowCoreThreadTimeOut(true); 

     for (int i = 0; i <count; i++) { 
      executor.execute(new Consumer(queue)); 
     } 

     **//INITIALIZE ACTIVEMQ CONFIGURATION HERE** 

     consumer.setMessageListener(new QueueMessageListener()); 
     executor.shutdown(); 
    } 

private static class QueueMessageListener implements MessageListener { 

      @Override 
      public void onMessage(Message message) { 
       //Setting the text message to a setter which takes TextMessage as arg 
       settext.setTextmessage((TextMessage) message); 
      } 
     } 
    } 

//Problem here unable to produce 
class Producer implements Runnable { 

    ConcurrentLinkedQueue<TextMessage> queue; 
    Settext settext; 
    Producer(ConcurrentLinkedQueue<TextMessage> queue2, Settext settext){ 
     this.queue = queue2; 
     this.settext=settext; 
    } 

    public void run() { 
     System.out.println("Producer Started"); 
     try { 
      if(this.settext.getTextmessage()!=null) 
      { 
       //Add to ConcurrentLinkedQueue 
       queue.add(this.settext.getTextmessage()); 
      } 
      Thread.currentThread().sleep(200); 
      //} 
     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } 
    } 
} 

//Problem here unable to consume 
class Consumer implements Runnable { 
    ConcurrentLinkedQueue<TextMessage> queue; 

    public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) { 
     this.queue = queue2; 
    } 
    public void run() { 
     TextMessage str; 
     System.out.println("Consumer Started"); 
     while ((str = queue.poll()) != null) { 
      System.out.println("Removed: " + str); 

     } 
     try { 
      Thread.currentThread().sleep(500); 
     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } 
     //} 
    } 

回答

2

我不知道你爲什麼這樣做,但有一個在您的設計問題,請參見下面的註釋1-5,請注意QueueMessageListener是異步執行的,它可以改變settext.setTextmessage((TextMessage) message);之前另一個消費者檢索的TextMessage並將其添加到隊列中,這也許是V2更好,但也許使用org.springframework.jms.listener.DefaultMessageListenerContainer是最好的解決辦法:

public static void main(String[] args) throws InterruptedException, JMSException { 

//Create a producer 
// 1- settext.getTextmessage() == null i suppose at this level, see 2- point 
     Thread producer = new Thread(new Producer(queue,settext)); 
     producer.start(); 
//Create a Consumer with coresize 4 and Max size 10 
     final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy()); 
     executor.allowCoreThreadTimeOut(true); 

     // 3- you start consumers go to 4, note that you will only consume count messages !! 
     for (int i = 0; i <count; i++) { 
      executor.execute(new Consumer(queue)); 
     } 

     **//INITIALIZE ACTIVEMQ CONFIGURATION HERE** 

     consumer.setMessageListener(new QueueMessageListener()); 
     executor.shutdown(); 
    } 

private static class QueueMessageListener implements MessageListener { 

      @Override 
      public void onMessage(Message message) { 
       //Setting the text message to a setter which takes TextMessage as arg 
       settext.setTextmessage((TextMessage) message); 
       // at this point message is considered as delivered if sessionAcknowledgeModeName is AUTO_ACKNOWLEDGE and maybe lost if asynchronous treatment fails 
      } 
     } 
    } 

//Problem here unable to produce 
class Producer implements Runnable { 

    ConcurrentLinkedQueue<TextMessage> queue; 
    Settext settext; 
    Producer(ConcurrentLinkedQueue<TextMessage> queue2, Settext settext){ 
     this.queue = queue2; 
     this.settext=settext; 
    } 

    public void run() { 
     System.out.println("Producer Started"); 
     try { 
     // 2- settext.getTextmessage() == null if block is not executed and thread will sleep and finish 
    // you have to add this   
      while (this.settext.getTextmessage() == null) { 
      try { 
       Thread.currentThread().sleep(500); 
      } catch (Exception ex) { 
       ex.printStackTrace(); 
      } 
      } 
      if(this.settext.getTextmessage()!=null) 
      { 
       //Add to ConcurrentLinkedQueue 
       queue.add(this.settext.getTextmessage()); 
      } 
      //} 
     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } 
    } 
} 

//Problem here unable to consume 
class Consumer implements Runnable { 
    ConcurrentLinkedQueue<TextMessage> queue; 

    public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) { 
     this.queue = queue2; 
    } 
    public void run() { 
     TextMessage str; 
     System.out.println("Consumer Started"); 
     // 4- queue.poll() == null at this level, while loop finished, thread will sleep and finish 
    // you have to add this 
     while ((str = queue.poll()) == null) { 
     try { 
      Thread.currentThread().sleep(500); 
     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } 
     } 
     System.out.println("Removed: " + str); 
     //} 
    } 

V2:

public static void main(String[] args) throws InterruptedException, JMSException { 

//Create a Consumer with coresize 4 and Max size 10 
     final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy()); 
     executor.allowCoreThreadTimeOut(true); 

     for (int i = 0; i <count; i++) { 
      executor.execute(new Consumer(queue)); 
     } 

     **//INITIALIZE ACTIVEMQ CONFIGURATION HERE** 

     consumer.setMessageListener(new QueueMessageListener()); 
     executor.shutdown(); 
    } 

private static class QueueMessageListener implements MessageListener { 

      @Override 
      public void onMessage(Message message) { 
       queue.add((TextMessage) message); 
      } 
     } 
    } 

//Problem here unable to consume 
class Consumer implements Runnable { 
    ConcurrentLinkedQueue<TextMessage> queue; 

    public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) { 
     this.queue = queue2; 
    } 
    public void run() { 
     TextMessage str; 
     System.out.println("Consumer Started"); 
     while ((str = queue.poll()) == null) { 
     try { 
      Thread.currentThread().sleep(500); 
     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } 
     } 
     System.out.println("Removed: " + str); 
     //} 
    } 

V3:

public static void main(String[] args) throws InterruptedException, JMSException { 

//Create a Consumer with coresize 4 and Max size 10 
     final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy()); 
     executor.allowCoreThreadTimeOut(true); 

     **//INITIALIZE ACTIVEMQ CONFIGURATION HERE** 

     consumer.setMessageListener(new QueueMessageListener()); 
     executor.shutdown(); 
    } 

private static class QueueMessageListener implements MessageListener { 

      @Override 
      public void onMessage(Message message) { 
       executor.execute(new Consumer((TextMessage) message)); 
      } 
     } 
    } 

//Problem here unable to consume 
class Consumer implements Runnable { 
    TextMessage textMessage; 

    public Consumer(TextMessage textMessage) { 
     this.textMessage = textMessage; 
    } 
    public void run() { 
     System.out.println("Removed: " + str); 
    } 
} 

V4:

public static void main(String[] args) throws InterruptedException, JMSException { 

    new Consumer(queue).start(); 

    **//INITIALIZE ACTIVEMQ CONFIGURATION HERE** 

    consumer.setMessageListener(new QueueMessageListener()); 
    executor.shutdown(); 
} 

private static class QueueMessageListener implements MessageListener { 

    @Override 
    public void onMessage(Message message) { 
     queue.add((TextMessage) message); 
    } 
} 

//Problem here unable to consume 
class Consumer implements Runnable { 
    ConcurrentLinkedQueue<TextMessage> queue; 

    public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) { 
     this.queue = queue2; 
    } 
    public void run() { 
     TextMessage str; 
     System.out.println("Consumer Started"); 
     while (true) { 
      try { 
       Thread.currentThread().sleep(500); 
      } catch (Exception ex) { 
      } 
      while ((str = queue.poll()) == null) { 
       try { 
        Thread.currentThread().sleep(500); 
       } catch (Exception ex) { 
       } 
      } 
      System.out.println("Removed: " + str); 
     } 
    } 
} 
+0

蔭從ActiveMQ的producer.I發送100個消息試過V2的代碼,但我的System.out.println( 「刪除」 + str);僅打印10次,即從隊列中僅去除10個元素。如果IAM錯誤,請糾正我的錯誤 –

+0

這意味着您的變量計數== 10? –

+0

也許V3更靈活 –

相關問題