我不知道你爲什麼這樣做,但有一個在您的設計問題,請參見下面的註釋1-5,請注意QueueMessageListener
是異步執行的,它可以改變settext.setTextmessage((TextMessage) message);
之前另一個消費者檢索的TextMessage並將其添加到隊列中,這也許是V2更好,但也許使用org.springframework.jms.listener.DefaultMessageListenerContainer是最好的解決辦法:
public static void main(String[] args) throws InterruptedException, JMSException {
//Create a producer
// 1- settext.getTextmessage() == null i suppose at this level, see 2- point
Thread producer = new Thread(new Producer(queue,settext));
producer.start();
//Create a Consumer with coresize 4 and Max size 10
final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);
// 3- you start consumers go to 4, note that you will only consume count messages !!
for (int i = 0; i <count; i++) {
executor.execute(new Consumer(queue));
}
**//INITIALIZE ACTIVEMQ CONFIGURATION HERE**
consumer.setMessageListener(new QueueMessageListener());
executor.shutdown();
}
private static class QueueMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
//Setting the text message to a setter which takes TextMessage as arg
settext.setTextmessage((TextMessage) message);
// at this point message is considered as delivered if sessionAcknowledgeModeName is AUTO_ACKNOWLEDGE and maybe lost if asynchronous treatment fails
}
}
}
//Problem here unable to produce
class Producer implements Runnable {
ConcurrentLinkedQueue<TextMessage> queue;
Settext settext;
Producer(ConcurrentLinkedQueue<TextMessage> queue2, Settext settext){
this.queue = queue2;
this.settext=settext;
}
public void run() {
System.out.println("Producer Started");
try {
// 2- settext.getTextmessage() == null if block is not executed and thread will sleep and finish
// you have to add this
while (this.settext.getTextmessage() == null) {
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
ex.printStackTrace();
}
}
if(this.settext.getTextmessage()!=null)
{
//Add to ConcurrentLinkedQueue
queue.add(this.settext.getTextmessage());
}
//}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
//Problem here unable to consume
class Consumer implements Runnable {
ConcurrentLinkedQueue<TextMessage> queue;
public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) {
this.queue = queue2;
}
public void run() {
TextMessage str;
System.out.println("Consumer Started");
// 4- queue.poll() == null at this level, while loop finished, thread will sleep and finish
// you have to add this
while ((str = queue.poll()) == null) {
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("Removed: " + str);
//}
}
V2:
public static void main(String[] args) throws InterruptedException, JMSException {
//Create a Consumer with coresize 4 and Max size 10
final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);
for (int i = 0; i <count; i++) {
executor.execute(new Consumer(queue));
}
**//INITIALIZE ACTIVEMQ CONFIGURATION HERE**
consumer.setMessageListener(new QueueMessageListener());
executor.shutdown();
}
private static class QueueMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
queue.add((TextMessage) message);
}
}
}
//Problem here unable to consume
class Consumer implements Runnable {
ConcurrentLinkedQueue<TextMessage> queue;
public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) {
this.queue = queue2;
}
public void run() {
TextMessage str;
System.out.println("Consumer Started");
while ((str = queue.poll()) == null) {
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("Removed: " + str);
//}
}
V3:
public static void main(String[] args) throws InterruptedException, JMSException {
//Create a Consumer with coresize 4 and Max size 10
final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);
**//INITIALIZE ACTIVEMQ CONFIGURATION HERE**
consumer.setMessageListener(new QueueMessageListener());
executor.shutdown();
}
private static class QueueMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
executor.execute(new Consumer((TextMessage) message));
}
}
}
//Problem here unable to consume
class Consumer implements Runnable {
TextMessage textMessage;
public Consumer(TextMessage textMessage) {
this.textMessage = textMessage;
}
public void run() {
System.out.println("Removed: " + str);
}
}
V4:
public static void main(String[] args) throws InterruptedException, JMSException {
new Consumer(queue).start();
**//INITIALIZE ACTIVEMQ CONFIGURATION HERE**
consumer.setMessageListener(new QueueMessageListener());
executor.shutdown();
}
private static class QueueMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
queue.add((TextMessage) message);
}
}
//Problem here unable to consume
class Consumer implements Runnable {
ConcurrentLinkedQueue<TextMessage> queue;
public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) {
this.queue = queue2;
}
public void run() {
TextMessage str;
System.out.println("Consumer Started");
while (true) {
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
}
while ((str = queue.poll()) == null) {
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
}
}
System.out.println("Removed: " + str);
}
}
}
蔭從ActiveMQ的producer.I發送100個消息試過V2的代碼,但我的System.out.println( 「刪除」 + str);僅打印10次,即從隊列中僅去除10個元素。如果IAM錯誤,請糾正我的錯誤 –
這意味着您的變量計數== 10? –
也許V3更靈活 –