我正在嘗試使用多個線程來使用jms隊列。 我知道每個線程應該有一個單獨的JMS會話,並且我在我的代碼中做了如下所示的操作。但是,我得到一個奇怪的例外與多線程客戶端消費JMS
這裏的異常堆棧跟蹤:
javax.jms.IllegalStateException: Forbidden call on a closed connection.
at org.objectweb.joram.client.jms.Connection.checkClosed(Connection.java:404)
at org.objectweb.joram.client.jms.Connection.createSession(Connection.java:530)
at MessageWorker.run(ReceiveJmsDemoMultiThreaded.java:96)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
我需要你的幫助,因爲這是我
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class ReceiveJmsDemoMultiThreaded {
public static void main(String[] args) {
Context context = null;
ConnectionFactory factory = null;
Connection connection = null;
Destination destination = null;
try {
context = getInitialContext();
factory = (QueueConnectionFactory) context.lookup("JQCF");
destination = (Destination) context.lookup("sampleQueue");
connection = factory.createConnection();
final ExecutorService executor = Executors.newCachedThreadPool();
executor.submit(new MessageWorker(connection, destination));
executor.submit(new MessageWorker(connection, destination));
executor.submit(new MessageWorker(connection, destination));
executor.submit(new MessageWorker(connection, destination));
connection.start();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (context != null) {
try {
context.close();
} catch (NamingException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
private static InitialContext getInitialContext() throws NamingException {
Properties prop = new Properties();
prop.put("java.naming.provider.url", "rmi://localhost:1099");
prop.put("java.naming.factory.initial",
"org.objectweb.carol.jndi.spi.MultiOrbInitialContextFactory");
return new InitialContext(prop);
}
}
class MessageWorker extends Thread {
Connection connection = null;
Destination dest = null;
Session session = null;
Destination destination = null;
public MessageWorker(Connection connection, Destination dest) {
this.connection = connection;
this.destination = dest;
}
@Override
public void run() {
try {
MessageConsumer receiver = null;
System.out.println("Starting Thread "+currentThread().getName());
while (true) {
try {
System.out.println("Waiting for next msg "+currentThread().getName());
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
receiver = session.createConsumer(destination);
Message msg = receiver.receive();
if (msg instanceof Message && msg != null) {
System.out.println("STARTING consuming "+msg.toString()+" by thread "+currentThread().getName());
Thread.sleep(2000);//some work here
System.out.println("ENDING consuming "+msg.toString()+" by thread "+currentThread().getName());
}
} catch (JMSException e) {
e.printStackTrace();
System.exit(1);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
}
非常感謝
您應該在創建MessageWorker實例之前開始連接。 – user1516873
,順便說一句,你收到的消息很奇怪。通常你可以通過實現javax.jms.MessageListener接口來實現它。 – user1516873
@ user1516873:我嘗試創建MessageWorker實例之前開始連接,但我仍然有完全相同的錯誤:( 實施java.jms.MessageListener用於接收異步消息,但我需要消耗同步信息,即路我使用的方法javax.jms.QueueConsumer.receive() 感謝您的幫助 – vonemya