2015-09-14 177 views
1

我有一個javax.jms.Queue隊列,並讓我的偵聽器偵聽此隊列。我得到消息(一個字符串)並執行一個將該字符串作爲輸入參數傳遞給該進程的進程。一次偵聽一個jms隊列並只處理10條消息

我想只運行一次運行該進程的10個實例。一旦這些完成後,只有下一條消息應該被處理。

如何實現?由於它一次讀取所有消息並運行該進程運行的許多實例,導致服務器被掛起。

// using javax.jms.MessageListener 
message = consumer.receive(5000); 
if (message != null) { 
    try { 
     handler.onMessage(message); //handler is MessageListener instance 
    } 
} 
+1

請將代碼顯示在哪裏,您的監聽器將消息拖放到進程中。 – Aify

+0

我正在使用javax.jms.MessageListener。 message = consumer.receive(5000); 如果(消息!= NULL){ \t \t \t嘗試{ \t \t \t \t handler.onMessage(消息); } handler是MessageListener實例。 – user3262365

+0

您的進程是在線程上執行還是在另一個JVM /本機/任意進程上執行? –

回答

1

嘗試把這個註解你的MDB監聽器:

@ActivationConfigProperty(propertyName = "maxSession", propertyValue = "10") 
+0

感謝您的快速響應。但是我沒有任何MDBListenere。我手動連接到隊列。 – user3262365

+0

我正在使用javax.jms.MessageListener,只要隊列中有消息,我就執行我的過程。 – user3262365

0

我認爲一個簡單的while檢查就足夠了。這是一些僞代碼。

While (running processes are less than 10) { 
    add one to the running processes list 
    do something with the message 
} 

,並在onMessage代碼:

function declaration of on Message(Parameters) { 
    do something 
    subtract 1 from the running processes list 
} 

確保你使用的計算正在運行的進程的量的變量聲明爲volatile。根據要求

例子:

public static volatile int numOfProcesses = 0; 

while (true) { 
    if (numOfProcesses < 10) { 
     // read a message and make a new process, etc 
     // probably put your receive code here 
     numOfProcesses++; 
    } 
} 

無論您對您的流程編寫了代碼:

// do stuff, do stuff, do more stuff 
// finished stuff 
numOfProcesses--; 
+0

是否有可能限制我的消費者一次只消費很少的消息。現在它只是一次消費所有的消息。你知道嗎? – user3262365

+0

@ user3262365我以前從來沒有遇到過這個問題,所以我不能肯定地說這是可能的,但另一種方法是限制你發送的消息,而不是限制消息隊列。如果您使用一個同步變量並結合一次發送消息10,則應該能夠實現要查找的內容。 – Aify

+0

你能用一些例子來詳細說明嗎? – user3262365

0

我假設你已經從你的外部進程接受hasTerminated消息的方式。此控制器線程將使用Semaphore與JMS偵聽器進行通信。 Semaphore初始化有10個許可證,並且每當外部進程調用TerminationController#terminate(或外部進程與您的偵聽器進程通信)時,它會向Semaphore添加許可證,然後JMSListener必須首先獲得許可證,然後才能調用messageConsumer.release()哪個確保一次只能有十個以上的進程處於活動狀態。

// created in parent class 
private final Semaphore semaphore = new Semaphore(10); 

@Controller 
public class TerminationController { 
    private final semaphore; 

    public TerminationController(Semaphore semaphore) { 
     this.semaphore = semaphore; 
    } 

    // Called from external processes when they terminate 
    public void terminate() { 
     semaphore.release(); 
    } 
} 

public class JMSListener implements Runnable { 
    private final MessageConsumer messageConsumer; 
    private final Semaphore semaphore; 

    public JMSListener(MessageConsumer messageConsumer, Semaphore semaphore) { 
     this.messageConsumer = messageConsumer; 
     this.semaphore = semaphore; 
    } 

    public void run() { 
     while(true) { 
      semaphore.acquire(); 
      Message message = messageConsumer.receive(); 
      // create process from message 
     } 
    } 
} 
+0

Hi @ Zim-Zam O'Pootertoot您能否以一個例子來詳細闡述它? – user3262365

相關問題