2011-03-23 69 views
4

我不是程序員,但我試圖通過給他們一些指導來幫助他們。我們在msmq上不再有內部專業知識。我們正在嘗試使用它來將一些功能與調度應用程序集成。在2008R2上正確使用Message Queue

調度應用程序通過使用自定義內置dll進行網絡調用來關閉工作。該dll調用weburl。網絡應用程序將運行其任務並將更新發送到網站,以瞭解其執行的任務。網站將消息寫入隊列。調用該站點的dll正在監視隊列中是否有分配給該作業的標籤的消息。當它收到最終狀態消息時,它會關閉。

我們每隔幾個小時就會收到以下消息。我們每小時運行近100個工作,使用這種方法。在底部列出的代碼中,jobid對應於消息隊列中消息的標籤。每個作業在開始時都會發佈一個jobid,並將其用作發送給該作業的msmq的每條消息的標籤。

System.Messaging.MessageQueueException (0x80004005): Message that the cursor is currently pointing to has been removed from the queue by another process or by another call to Receive without the use of this cursor. 
    at System.Messaging.MessageQueue.ReceiveCurrent(TimeSpan timeout, Int32 action, CursorHandle cursor, MessagePropertyFilter filter, MessageQueueTransaction internalTransaction, MessageQueueTransactionType transactionType) 
    at System.Messaging.MessageEnumerator.get_Current() 

這是它的代碼。

while (running) 
     { 
      // System.Console.WriteLine("Begin Peek"); 
      messageQueue.Peek(); 
      //System.Console.WriteLine("End Peek"); 
      messageQueue.MessageReadPropertyFilter.SetAll(); 

      using (MessageEnumerator enumerator = messageQueue.GetMessageEnumerator2()) 
      { 
       enumerator.Reset(); 

       while (enumerator.MoveNext()) 
       { 
        Message msg = enumerator.Current; 

        if (msg.Label.Equals(this.jobid)) 
        { 
         StringBuilder sb = new StringBuilder(); 
         /* 
         try 
         { 
          sb.Append("Message Source: "); 
          //sb.Append(msg.SourceMachine); 
          sb.Append(" Sent: "); 
          sb.Append(msg.SentTime); 
          sb.Append(" Label "); 
          sb.Append(msg.Label); 
          sb.Append(" ID: "); 
          sb.Append(msg.Id); 
          sb.Append(" CorrelationID: "); 
          sb.Append(msg.CorrelationId); 
          sb.Append(" Body Type: "); 
          sb.Append(msg.BodyType); 
         } 
         catch (Exception) 
         { 
          throw; 
         } 
         finally 
         { 
          System.Console.WriteLine(sb.ToString()); 
         } 
         */ 
         //System.Console.WriteLine("Receiving Message started"); 
         using (Message message = messageQueue.ReceiveById(msg.Id)) 
         { 
          //System.Console.WriteLine("Receiving Message Complete"); 
          //sb = new StringBuilder(); 
          string bodyText = string.Empty; 

          try 
          { 
           System.IO.StringWriter sw = new System.IO.StringWriter(sb); 
           System.IO.StreamReader sr = new System.IO.StreamReader(message.BodyStream); 

           while (!sr.EndOfStream) 
           { 
            sw.WriteLine(sr.ReadLine()); 
           } 
           sr.Close(); 
           sw.Close(); 
           bodyText = (string) FromXml(sb.ToString(), typeof(string)); 
           int indx = bodyText.IndexOf(','); 
           string tokens = bodyText.Substring(indx + 1); 
           indx = tokens.IndexOf(','); 
           string command = tokens.Substring(0, indx); 
           tokens = tokens.Substring(indx + 1); 
           if (command.Equals(COMMAND_STARTED)) 
           { 
            System.Console.WriteLine("STARTED " + tokens); 
           } 
           else if (command.Equals(COMMAND_UPDATE)) 
           { 
            System.Console.WriteLine(tokens); 
           } 
           else if (command.Equals(COMMAND_ENDED_OK)) 
           { 
            System.Console.WriteLine(tokens); 
            System.Console.WriteLine("WEBJOB: Success"); 
            finalResults = new FinalResults(0, 0, "Success"); 
            running = false; 
           } 
           else if (command.Equals(COMMAND_ENDED_WARNING)) 
           { 
            System.Console.WriteLine(tokens); 
            System.Console.WriteLine("WEBJOB: Warning Issued"); 
            finalResults = new FinalResults(1, 1, "Warning"); 
            running = false; 
           } 
           else if (command.Equals(COMMAND_ENDED_FAIL)) 
           { 
            System.Console.WriteLine(tokens); 
            System.Console.WriteLine("WEBJOB: Failure"); 
            finalResults = new FinalResults(2, 16, "Failure"); 
            running = false; 
           } 
          } 
          catch (Exception) 
          { 
           throw; 
          } 
          finally 
          { 
           //System.Console.WriteLine("Body: " + bodyText); 
          } 
         } 
        } 
       } 
      } 
     } 

     return finalResults; 
    } 

    MessageQueue messageQueue = null; 
    string webServiceURL = ""; 
    Dictionary<string, string> parms = new Dictionary<string, string>(); 
    string jobid = "NONE"; 

回答

3

kprobst的解釋可能是發生了什麼事情。即使您看到此特定消息位於隊列中,但如果其他應用程序(或同一應用程序的不同實例)從此隊列中選取(任何)消息,則會使遊標無效。

本質上,如果多個進程從同一隊列中饋送,則此代碼不適用於工作。

+0

順便說一句。如果您使用的是2008 R2,則可以通過使用子隊列而無需太多修改即可使其工作。 http://technet.microsoft.com/en-us/library/cc730897(WS.10).aspx。就你而言,每個工作都是一個子隊列。 – Naraen 2011-03-23 22:49:37

+0

子隊列看起來很有希望。目前通過一些例子來看看它是如何工作的以及它是如何工作的。 – ddjammin 2011-03-24 02:11:41

+0

忘記標記爲答案。我們確實實現了subqueue,並且已經證明是可靠的,謝謝你的幫助。 – ddjammin 2011-10-11 04:07:13

3

這通常意味着接收操作可以完成之前,正在接收的消息正被其他東西刪除。另一個應用程序或另一個線程與您的代碼使用不同的隊列引用在同一個進程中。

是否有可能有兩個處理器代碼實例(我猜這是一個控制檯應用程序)同時運行?在相同或不同的機器上?或者某些其他應用程序或工具從隊列中刪除消息?

曾經有一個.NET 2.0的預發佈版本中存在一個錯誤,它會在某些壓力條件下導致此問題,但據我記憶,它們在發貨前已經修復。

+1

消息實際上仍然在隊列中。作業失敗並顯示錯誤消息。我可以查看隊列並仍然可以找到屬於該作業的消息標籤。就好像它被鎖定一樣。如果我理解代碼(仍然聲稱不是開發人員),則每個創建的作業也將創建一個偵聽器,以監聽使用其jobid /標籤偵聽郵件的隊列。它會生成一個隊列迭代的動態列表,並用它的標籤處理任何消息。它會在它收到結束狀態消息時關閉。 – ddjammin 2011-03-23 22:35:51

+0

我明白了。那麼你可能會遇到CLR中的一個bug。我可以告訴你的一件事是,這是一種使用MSMQ的相當非標準的方式;通常異步接收效率更高,這就是我使用的。即使在高容量的情況下,我也從未遇到過這個問題。你有幾個選擇。一,完全重寫。二,修改隊列,使其成爲事務性的,並改變代碼來處理。這可能會擺脫錯誤。三,向微軟報告,看看他們是否可以幫助你與KB補丁或類似的東西。 – kprobst 2011-03-23 22:49:00

+0

感謝您的回覆。該計劃將重寫這個。這將不得不等到下週,接管代碼的開發人員纔會回來,我們可以同步。在此期間,我們對錯誤信息做了一次嘗試性的調查,因爲信息並沒有真正消失,所以再次檢查。這只是爲了讓我們到下週。 – ddjammin 2011-03-24 02:09:49

1

由於MessageQueue的內部方法ReceiveCurrent中存在併發問題,因此失敗。 異常堆棧跟蹤顯示發起於枚舉器的調用。當前行和異常發生在ReceiveCurrent。 Enumerator.Current使用「peek」選項調用ReceiveCurrent。你可以問,當我遇到同樣的問題時,我也遇到了這種情況,如何通過「消息已收到」錯誤消失?它只是試圖偷看還沒有收到的下一封郵件? 那麼答案就在於ReceiveCurrent代碼,這是avaliable審查這裏: https://referencesource.microsoft.com/#System.Messaging/System/Messaging/MessageQueue.cs,02c33cc512659fd7,references

ReceiveCurrent首先使得StaleSafeReceive通話偷看的下一條消息。但是如果這個調用返回,它需要更多的內存來接收整個消息( 「while(MessageQueue.IsMemoryError(status)」在其源代碼中),它分配所需的內存並進行另一次StaleSafeReceive調用以獲取消息。 這是非常經典的Win32 API的使用模式由於最終基於其爲C。

這裏的問題是,如果內部ReceiveCurrent所述第一和第二呼叫到StaleSafeReceive另一個進程或線程之間「接收」,即從隊列中刪除該消息,第二次調用拋出這個確切的異常。這就是「窺視」操作失敗的原因。 請注意,它可能是由導致異常的枚舉器正在掃描的任何消息,而不是正在查找的消息。這就解釋了爲什麼具有該作業ID的消息在拋出異常並且方法失敗後仍然存在於隊列中。

可以做些什麼是防止enumerator.Current調用try catch,如果這個特定的異常被捕獲,只需繼續枚舉與隊列中的下一個可用消息。

我曾經使用過Cursor對象而不是枚舉器,但它遇到了同樣的問題。但使用Cursor的用法還有另外一種方法來降低這種情況發生的風險,即在掃描/窺視消息時關閉當前Queue對象的MessagePropertyFilter的所有不需要的屬性,尤其是Body屬性。因爲在窺視過程中通常不需要接收主體,但是大多數情況下,消息的主體會導致內存重新分配,並且需要在ReceiveCurrent中調用第二個StaleSafeReceive調用。 對於這種異常,仍然需要使用直接使用Cursor來調用這個異常。

相關問題