2017-05-08 52 views
1

我的代碼:C#:IBM MQ「解鎖」消息使用Read()方法時

 //Initialize MQMessage 
     MQMessage message = new MQMessage(); 

     //Initialize WebMQConnection 
     WebSphereMQConnection mqRequestConnection = new WebSphereMQConnection(initQMName, initQChannel, initQConnection, initQName, string.Empty, string.Empty); 
     mqRequestConnection.Open(); 
     mqRequestConnection.Read(message); 


     //Get the contents as a string 
     string body = message.ReadString(message.MessageLength); 
     return body; 

該代碼是一個控制檯應用程序的一部分,並且如預期用於瀏覽隊列順序和讀取每個消息的工作原理。它解析來自平面文件的字符串內容。

但是,似乎Read()方法也會鎖定消息,直到程序關閉。即使我在一個循環中運行該程序以順序讀取所有消息,但在程序完全關閉之前似乎不會「釋放」這些消息。

我已經試過取得和放置,處置,退出等,並沒有什麼似乎工作,除了停止整個控制檯應用程序的執行。

+0

'WebSphereMQConnection'看起來並不像IBM提供的東西。你可以發佈這是做什麼?它是否在同步點下打開隊列?如果是這樣,您需要爲MQ添加提交以「釋放」消息。 – JoshMc

+0

@JoshMc它是一個類的構造函數,最終在調用處理Syncpoint操作的Open()時創建隊列管理器。這是傳統的代碼,我沒有太多的控制權。我嘗試添加一個提交,但它似乎沒有做任何事情。我開始認爲這個問題可能與我給出的核心代碼有關。 – Reed

+0

也許有一種方法可以執行提交? – JoshMc

回答

0
//Initialize WebMQConnection 
WebSphereMQConnection mqRequestConnection = new WebSphereMQConnection(initQMName, initQChannel, initQConnection, initQName, string.Empty, string.Empty); 
mqRequestConnection.Open(); 
mqRequestConnection.Read(message); 

世界上是什麼?這當然不是IBM MQ類和方法。所以,這是一個本土的階級。

因此,Read()方法實際上執行'瀏覽'。名字不好的方法。我敢打賭,在您的Read()方法中,它使用MQGET上的MQGMO_LOCK選項。

爲什麼不擺脫WebSphereMQConnection類並只寫純IBM MQ .NET代碼?

這裏是一個MQ CS .NET示例程序來瀏覽隊列中沒有任何消息鎖定:

using System; 
using IBM.WMQ; 

/// <summary> Program Name 
/// MQTest62B 
/// 
/// Description 
/// This C# class will connect to a remote queue manager 
/// and get (browse) a message from a queue using a managed .NET environment. 
/// 
/// Sample Command Line Parameters 
/// -h 127.0.0.1 -p 1415 -c TEST.CHL -m MQWT1 -q TEST.Q1 
/// </summary> 
/// <author> Roger Lacroix, Capitalware Inc. 
/// </author> 
namespace MQTest62 
{ 
    public class MQTest62B 
    { 
     private System.Collections.Hashtable inParms = null; 
     private System.String qManager; 
     private System.String outputQName; 
     private System.String userID = "tester"; 
     private System.String password = "barney"; 

     /* 
     * The constructor 
     */ 
     public MQTest62B() 
      : base() 
     { 
     } 

     /// <summary> Make sure the required parameters are present.</summary> 
     /// <returns> true/false 
     /// </returns> 
     private bool allParamsPresent() 
     { 
      bool b = inParms.ContainsKey("-h") && inParms.ContainsKey("-p") && 
        inParms.ContainsKey("-c") && inParms.ContainsKey("-m") && 
        inParms.ContainsKey("-q"); 
      if (b) 
      { 
       try 
       { 
        System.Int32.Parse((System.String)inParms["-p"]); 
       } 
       catch (System.FormatException e) 
       { 
        b = false; 
       } 
      } 

      return b; 
     } 

     /// <summary> Extract the command-line parameters and initialize the MQ variables.</summary> 
     /// <param name="args"> 
     /// </param> 
     /// <throws> IllegalArgumentException </throws> 
     private void init(System.String[] args) 
     { 
      inParms = System.Collections.Hashtable.Synchronized(new System.Collections.Hashtable(14)); 
      if (args.Length > 0 && (args.Length % 2) == 0) 
      { 
       for (int i = 0; i < args.Length; i += 2) 
       { 
        inParms[args[i]] = args[i + 1]; 
       } 
      } 
      else 
      { 
       throw new System.ArgumentException(); 
      } 

      if (allParamsPresent()) 
      { 
       qManager = ((System.String)inParms["-m"]); 
       outputQName = ((System.String)inParms["-q"]); 
       // Set up MQ environment 
       MQEnvironment.Hostname = ((System.String)inParms["-h"]); 
       MQEnvironment.Channel = ((System.String)inParms["-c"]); 
       try 
       { 
        MQEnvironment.Port = System.Int32.Parse((System.String)inParms["-p"]); 
       } 
       catch (System.FormatException e) 
       { 
        MQEnvironment.Port = 1414; 
       } 

       if (userID != null) 
        MQEnvironment.UserId = userID; 

       if (password != null) 
        MQEnvironment.Password = password; 
      } 
      else 
      { 
       throw new System.ArgumentException(); 
      } 
     } 

     /// <summary> Connect, open queue, read (browse) a message, close queue and disconnect. </summary> 
     /// 
     private void testReceive() 
     { 
      MQQueueManager qMgr = null; 
      MQQueue inQ = null; 
      int openOptions = MQC.MQOO_BROWSE + MQC.MQOO_FAIL_IF_QUIESCING; 

      try 
      { 
       qMgr = new MQQueueManager(qManager); 
       System.Console.Out.WriteLine("MQTest62B successfully connected to " + qManager); 

       inQ = qMgr.AccessQueue(outputQName, openOptions, null, null, null); // no alternate user id 
       System.Console.Out.WriteLine("MQTest62B successfully opened " + outputQName); 

       testLoop(inQ); 

      } 
      catch (MQException mqex) 
      { 
       System.Console.Out.WriteLine("MQTest62B cc=" + mqex.CompletionCode + " : rc=" + mqex.ReasonCode); 
      } 
      catch (System.IO.IOException ioex) 
      { 
       System.Console.Out.WriteLine("MQTest62B ioex=" + ioex); 
      } 
      finally 
      { 
       try 
       { 
        if (inQ != null) 
         inQ.Close(); 
        System.Console.Out.WriteLine("MQTest62B closed: " + outputQName); 
       } 
       catch (MQException mqex) 
       { 
        System.Console.Out.WriteLine("MQTest62B cc=" + mqex.CompletionCode + " : rc=" + mqex.ReasonCode); 
       } 

       try 
       { 
        if (qMgr != null) 
         qMgr.Disconnect(); 
        System.Console.Out.WriteLine("MQTest62B disconnected from " + qManager); 
       } 
       catch (MQException mqex) 
       { 
        System.Console.Out.WriteLine("MQTest62B cc=" + mqex.CompletionCode + " : rc=" + mqex.ReasonCode); 
       } 
      } 
     } 

     private void testLoop(MQQueue inQ) 
     { 
      bool flag = true; 
      MQMessage msg = new MQMessage(); 
      MQGetMessageOptions gmo = new MQGetMessageOptions(); 
      gmo.Options |= MQC.MQGMO_BROWSE_NEXT | MQC.MQGMO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING; 
      gmo.WaitInterval = 500; // 1/2 second wait time or MQC.MQEI_UNLIMITED 

      while (flag) 
      { 
       try 
       { 
        msg = new MQMessage(); 
        inQ.Get(msg, gmo); 
        System.Console.Out.WriteLine("Message Data: " + msg.ReadString(msg.MessageLength)); 
       } 
       catch (MQException mqex) 
       { 
        System.Console.Out.WriteLine("MQTest62B CC=" + mqex.CompletionCode + " : RC=" + mqex.ReasonCode); 
        if (mqex.Reason == MQC.MQRC_NO_MSG_AVAILABLE) 
        { 
         // no meesage - life is good - loop again 
        } 
        else 
        { 
         flag = false; // severe error - time to exit 
        } 
       } 
       catch (System.IO.IOException ioex) 
       { 
        System.Console.Out.WriteLine("MQTest62B ioex=" + ioex); 
       } 
      } 
     } 

     /// <summary> main line</summary> 
     /// <param name="args"> 
     /// </param> 
     //  [STAThread] 
     public static void Main(System.String[] args) 
     { 
      MQTest62B write = new MQTest62B(); 

      try 
      { 
       write.init(args); 
       write.testReceive(); 
      } 
      catch (System.ArgumentException e) 
      { 
       System.Console.Out.WriteLine("Usage: MQTest62B -h host -p port -c channel -m QueueManagerName -q QueueName"); 
       System.Environment.Exit(1); 
      } 
      catch (MQException e) 
      { 
       System.Console.Out.WriteLine(e); 
       System.Environment.Exit(1); 
      } 

      System.Environment.Exit(0); 
     } 
    } 
}