2012-10-18 36 views
1

我們有一種情況,我們設置了一個組件來遠程使用spring批處理運行批處理作業。我們發送帶有作業xml路徑,名稱,參數等的JMS消息,然後等待調用批處理客戶端獲取服務器的響應。JMS/spring/AMQ異步處理消息時出現問題或混淆

服務器讀取隊列,並調用適當的方法來運行作業並返回結果,而我們的通信框架不會通過:

this.jmsTemplate.send(queueName, messageCreator); 
    this.LOGGER.debug("Message sent to '" + queueName + "'"); 

    try { 
     final Destination replyTo = messageCreator.getReplyTo(); 
     final String correlationId = messageCreator.getMessageId(); 

     this.LOGGER.debug("Waiting for the response '" + correlationId + "' back on '" + replyTo + "' ..."); 
     final BytesMessage message = (BytesMessage) this.jmsTemplate.receiveSelected(replyTo, "JMSCorrelationID='" 
       + correlationId + "'"); 
     this.LOGGER.debug("Response received"); 

理想情況下,我們希望能夠以兩倍叫出runJobSync方法,並有兩個工作同時進行。我們有一個單元測試,做了類似的工作,沒有工作。我意識到這個代碼不是很好,但是,這裏是:

final List結果= Collections.synchronizedList(new ArrayList());

Thread thread1 = new Thread(new Runnable(){ 

     @Override 
     public void run() { 
      client.pingWithDelaySync(1000); 
      result.add(Thread.currentThread().getName()); 
     } 

    }, "thread1"); 

    Thread thread2 = new Thread(new Runnable(){ 

     @Override 
     public void run() { 
      client.pingWithDelaySync(500);    
      result.add(Thread.currentThread().getName()); 
     } 

    }, "thread2"); 

    thread1.start(); 
    Thread.sleep(250); 
    thread2.start(); 

    thread1.join(); 
    thread2.join(); 

    Assert.assertEquals("both thread finished", 2, result.size()); 
    Assert.assertEquals("thread2 finished first", "thread2", result.get(0)); 
    Assert.assertEquals("thread1 finished second", "thread1", result.get(1)); 

當我們運行測試,線程2完成第一,因爲它只是有一個500 millisencond等待,而線程1做了1秒鐘的等待:

Thread.sleep(delayInMs); 
    return result; 

那偉大工程。 當我們在野外運行兩個遠程作業,一個需要大約50秒完成,另一個旨在立即失敗並返回,這不會發生。

啓動50秒作業,然後立即啓動即時失敗作業。客戶端打印我們發送的消息,請求作業運行,服務器打印它收到的第50個請求,但等到完成第50個第二個作業之後再處理第二個消息,即使我們使用ThreadPoolExecutor。

我們正在使用自動確認運行交易。

做了一些遠程調試,AbstractPollingMessageListenerContainer中的Consumer顯示沒有未處理的消息(因此consumer.receive()顯然只是一遍又一遍地返回null)。 amq代理的webgui顯示了2個隊列,1個隊列,1個派遣隊伍和1個派送隊列。這表明AMQ阻止消費者「擁有」第二條信息的是某些東西。 (預取爲1000 btw) 這顯示爲特定隊列的唯一使用者。

我和其他幾個開發人員在過去幾天都在探索,幾乎沒有任何地方。任何建議,如果這是預期的行爲,我們有什麼錯誤配置,或者什麼會在這裏被打破。

被遠程調用的方法是否至關重要?目前,作業處理程序方法使用執行程序在另一個線程中運行作業,並執行future.get()(額外的線程是出於與日誌相關的原因)。

任何幫助是極大的讚賞

回答

0

不知道我完全跟隨,不過關的頂部,可以嘗試以下...

  • 設置concurrentConsumers/maxConcurrentConsumers大於默認值(1)在MessageListenerContainer的
  • 預讀設置爲0,以更好地促進平衡消費者之間信息等