2012-06-29 22 views
1

我有一個基於websphere的代碼片段,用於從MQ隊列中提取消息並處理它們。大多數情況下,代碼可以正常工作,但是每隔幾天,代碼就會繼續運行,但即使隊列中仍有消息,它仍會停止接收消息。爲了讓程序返回工作,我需要對應用程序進行衝擊,然後所有事情都開始正常工作。Websphere 7 MQ應用程序停止提取郵件

這些消息可能非常大(高達4MB的消息),我上運行爲7

我不認爲這似乎代表着一個錯誤的任何消息或異常。

這裏是人的代碼上

public class BlipMqProcessor { 

    protected static final int ONE_SECOND = 1000; 
    protected static final int ONE_HOUR = 60 * 60 * ONE_SECOND; 
    protected static final int MQ_READ_TIMEOUT = Integer.parseInt(Constants.MQ_READ_TIMEOUT_IN_SECONDS) * ONE_SECOND; 

    protected static int previousMqReasonCode; 
    protected static long previousMqErrorTime; 

    private BlipXmlProcessor xmlProcessor; 

    // Member variables for MQ processing 
    protected MQQueueManager qMgr; 
    protected MQQueue queue; 
    protected MQGetMessageOptions gmo; 

    /** 
    * Constructs a new BlipMqProcessor with the given values. 
    */ 
    public BlipMqProcessor() { 
     this(new BlipXmlProcessor()); 
    } 

    /** 
    * Constructs a new BlipMqProcessor with the given values. 
    * @param xmlProcessor the processor that will be used to create the 
    *  staging table entries. 
    */ 
    public BlipMqProcessor(final BlipXmlProcessor xmlProcessor) { 
     super(); 
     this.xmlProcessor = xmlProcessor; 
    } 

    /** 
    * Reads XML messages from the Constants.MQ_ACCESS_QUEUE_NAME 
    * 
    * @throws BlipModelException if there are any 
    */ 
    public void readFromMQ() throws BlipModelException { 
     try { 
      createNewConnectionToMQ(); 
      while(true) { 
       MQMessage outputMessage = new MQMessage(); 
       queue.get(outputMessage,gmo); 
       String blipModelXml = outputMessage.readLine(); 
       BlipLogs.logXML("BlipREQ", "0", blipModelXml); 
       processMessage(blipModelXml); 
       qMgr.commit(); 
      } 
     } catch (final MQException e) { 
      if (e.reasonCode != MQException.MQRC_NO_MSG_AVAILABLE) { 
       handleMqException(e); 
      } 
     } catch (final IOException e) { 
      throw new BlipModelException("MQ", "Error reading MQ message.", "BlipMqProcessor.readFromMQ", e); 
     } finally { 
      cleanupMQResources(); 
     } 
    } 


    /** 
    * Clean up MQ resources. 
    */ 
    private void cleanupMQResources() { 
     // Close queue 
     if(queue != null) { 
      try { 
       queue.close(); 
      }catch(final MQException e) { 
       BlipModelLogger.error("MQ", "BlipMqProcessor", "Problem closing queue: " + e); 
      } 
     } 
     // Disconnect queue manager 
     if(qMgr != null) { 
      try { 
       qMgr.disconnect(); 
      } catch (final MQException e) { 
       BlipModelLogger.error("MQ", "BlipMqProcessor", "Problem disconnecting from qMgr: " + e); 
      } 
     } 
    } 

    protected void createNewConnectionToMQ() throws MQException { 
     try { 
      MQEnvironment.hostname = Constants.MQ_HOST; 
      MQEnvironment.channel = Constants.MQ_CHANNEL; 
      MQEnvironment.port  = Integer.parseInt(Constants.MQ_PORT); 
      if(Constants.MQ_SSL_CIPHER_SUITE != null) { 
       MQEnvironment.sslCipherSuite = Constants.MQ_SSL_CIPHER_SUITE; 
       MQEnvironment.sslPeerName = Constants.MQ_SSL_PEER; 
      } else { 
       MQEnvironment.sslCipherSuite = ""; 
       MQEnvironment.sslPeerName = ""; 
      } 

      qMgr = new MQQueueManager(Constants.MQ_QMGR); 
      int openOptions = MQC.MQOO_INPUT_AS_Q_DEF; 
      queue = qMgr.accessQueue(Constants.MQ_IN_ACCESS_QUEUE, openOptions); 
      gmo = new MQGetMessageOptions(); 
      gmo.options = MQC.MQGMO_WAIT | MQC.MQGMO_SYNCPOINT | MQC.MQGMO_FAIL_IF_QUIESCING; 
      gmo.waitInterval = MQ_READ_TIMEOUT; 
     } finally { 
      MQEnvironment.sslCipherSuite = ""; 
      MQEnvironment.sslPeerName = ""; 
     } 
    } 

    protected void handleMqException(final MQException e) { 
     long currentTime = System.currentTimeMillis(); 
     long timeBetweenMqErrors = currentTime - previousMqErrorTime; 
     if (previousMqReasonCode != e.reasonCode || timeBetweenMqErrors > ONE_HOUR) { 
      previousMqReasonCode = e.reasonCode; 
      previousMqErrorTime = currentTime; 
      BlipModelLogger.error("MQ", "BlipMqProcessor", "MQException reading from Access Queue: " + e); 
     } 
    } 


} 
+0

其餘的......請檢查[我的問題在這裏](http://stackoverflow.com/questions/11056776/convert-string-from-ebcdic-to-unicode-utf8) – mKorbel

+0

您的輸入隊列有「BOQTHRESH」和「BOQNAME」已設置?如果您的應用程序遇到有害消息並且沒有定義退出隊列和退出閾值,則它會將該消息退出,直至超出JEE容器的退出閾值。此時,該應用停止獲取新消息,但由於它不認爲這是致命的,它看起來很高興。一些有害消息錯誤是由調整引起的,因此在重新啓動時清除。這是一個很長的鏡頭,但很容易定義一個新的隊列並設置BOQNAME(NEW_QUEUE)和BOQTHRESH(5),每個應用程序都應該有這些。 –

回答

0

更改readFromMQ方法評論;

public void readFromMQ() throws BlipModelException { 
    try { 
     createNewConnectionToMQ(); 
     while(true) { 
      try { 
      MQMessage outputMessage = new MQMessage(); 
      queue.get(outputMessage,gmo); 
      String blipModelXml = outputMessage.readLine(); 
      BlipLogs.logXML("BlipREQ", "0", blipModelXml); 
      processMessage(blipModelXml); 
      qMgr.commit(); 
      } catch (MQException e) { 
      if (e.reasonCode != MQException.MQRC_NO_MSG_AVAILABLE) { 
       throw e; 
      } 
      } 
     } 
    } catch (final MQException e) { 
     handleMqException(e); 
    } catch (final IOException e) { 
     throw new BlipModelException("MQ", "Error reading MQ message.", "BlipMqProcessor.readFromMQ", e); 
    } finally { 
     cleanupMQResources(); 
    } 
} 

將是快速解決方案;但不夠優雅。這裏有很多重構的空間。

發生了什麼事實是您實際上正在獲取MQRC_NO_MSG_AVAILABLE,因爲有趣的是,沒有其他消息可用於檢索(在某個時間點,而不是您想象的時間);當你決定忽略這個異常時,你已經退出while(true)循環。

您不能使用queue.getCurrentQueueDepth(),因爲它不是高性能的(顯然),也是因爲它不適用於羣集上的別名隊列或隊列。這是非常多的;它很糟糕。

+0

當你打開一個隊列時,WMQ返回已解析的名字,這樣你就可以查詢你想要的深度。如果您只關心隊列深度是否爲「0」,則使用'queue.getCurrentQueueDepth()'不會執行*因爲您想獲取下一條消息*。執行'GET'並接收消息或MQRC_NO_MSG_AVAILABLE通常比查詢深度然後在深度<0(並消除競爭條件)時執行GET更爲高效。查詢已經打開句柄的隊列的深度是相當快的。 –