2013-06-05 156 views
1

我正在嘗試使用多個線程來使用jms隊列。 我知道每個線程應該有一個單獨的JMS會話,並且我在我的代碼中做了如下所示的操作。但是,我得到一個奇怪的例外與多線程客戶端消費JMS

這裏的異常堆棧跟蹤:

javax.jms.IllegalStateException: Forbidden call on a closed connection. 
    at org.objectweb.joram.client.jms.Connection.checkClosed(Connection.java:404) 
    at org.objectweb.joram.client.jms.Connection.createSession(Connection.java:530) 
    at MessageWorker.run(ReceiveJmsDemoMultiThreaded.java:96) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 

我需要你的幫助,因爲這是我

import java.util.Properties; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 

import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.QueueConnectionFactory; 
import javax.jms.Session; 
import javax.naming.Context; 
import javax.naming.InitialContext; 
import javax.naming.NamingException; 

public class ReceiveJmsDemoMultiThreaded { 

public static void main(String[] args) { 
    Context context = null; 
    ConnectionFactory factory = null; 
    Connection connection = null; 
    Destination destination = null; 

    try { 
     context = getInitialContext(); 
     factory = (QueueConnectionFactory) context.lookup("JQCF"); 
     destination = (Destination) context.lookup("sampleQueue"); 
     connection = factory.createConnection(); 

     final ExecutorService executor = Executors.newCachedThreadPool(); 
     executor.submit(new MessageWorker(connection, destination)); 

     executor.submit(new MessageWorker(connection, destination)); 

     executor.submit(new MessageWorker(connection, destination)); 

     executor.submit(new MessageWorker(connection, destination)); 

     connection.start(); 

    } catch (Exception e) { 
     e.printStackTrace(); 
    } finally { 
     if (context != null) { 
      try { 
       context.close(); 
      } catch (NamingException e) { 
       e.printStackTrace(); 
      } 
     } 

     if (connection != null) { 
      try { 
       connection.close(); 
      } catch (JMSException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 
} 

private static InitialContext getInitialContext() throws NamingException { 
    Properties prop = new Properties(); 
    prop.put("java.naming.provider.url", "rmi://localhost:1099"); 
    prop.put("java.naming.factory.initial", 
       "org.objectweb.carol.jndi.spi.MultiOrbInitialContextFactory"); 
    return new InitialContext(prop); 
} 

} 

class MessageWorker extends Thread { 
Connection connection = null; 
Destination dest = null; 
Session session = null; 
Destination destination = null; 

public MessageWorker(Connection connection, Destination dest) { 
    this.connection = connection; 
    this.destination = dest; 
} 
@Override 
public void run() { 
    try { 
     MessageConsumer receiver = null; 
     System.out.println("Starting Thread "+currentThread().getName()); 
     while (true) { 
      try { 
       System.out.println("Waiting for next msg "+currentThread().getName()); 
       session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
       receiver = session.createConsumer(destination); 
       Message msg = receiver.receive(); 
       if (msg instanceof Message && msg != null) { 
        System.out.println("STARTING consuming "+msg.toString()+" by thread "+currentThread().getName()); 
        Thread.sleep(2000);//some work here 
        System.out.println("ENDING consuming "+msg.toString()+" by thread "+currentThread().getName()); 
       } 
      } catch (JMSException e) { 

       e.printStackTrace(); 
       System.exit(1); 
      } 
     } 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } finally { 

    } 
} 
} 

非常感謝

+0

您應該在創建MessageWorker實例之前開始連接。 – user1516873

+0

,順便說一句,你收到的消息很奇怪。通常你可以通過實現javax.jms.MessageListener接口來實現它。 – user1516873

+0

@ user1516873:我嘗試創建MessageWorker實例之前開始連接,但我仍然有完全相同的錯誤:( 實施java.jms.MessageListener用於接收異步消息,但我需要消耗同步信息,即路我使用的方法javax.jms.QueueConsumer.receive() 感謝您的幫助 – vonemya

回答

2

阻塞問題您看到此問題,因爲在主線程中,在將作業提交給Executor Service後,連接將使用以下方式關閉:

 connection.close(); 

所以,當線程試圖創建一個使用此共享連接(其中剛剛關閉)會話,他們得到這個例外。沒有什麼意外的。 僅用於測試,您可以讓主線程長時間休眠,直到所有線程完成接收消息。這樣,你可以確認你沒有收到這個異常。

真正的解決方案可能是關閉執行程序服務並使主線程awaitTermination()等待完成提交的作業。

+0

你是對的。我加入這個「connection.awaitTermination(1 TimeUnit.HOUR)」和它的作品現在 感謝您非常 – vonemya

+0

太好了!那麼,你可以繼續接受答案嗎? – brainOverflow