2010-02-06 34 views
8

我有一個場景,我需要儘快爲隊列刪除一個項目,一旦處理完畢。 我明白從集合而在循環我無法刪除的項目,但不知道如果事情 可以用枚舉器等來完成...如何修改循環中的隊列集合?

這僅僅是一個基本的例子拋出一個錯誤 「系列後,已修改枚舉器被實例化。「

有什麼建議嗎?非常感謝!!!

代碼如下:

 class Program 
     { 
      static void Main() 
      { 

       Queue<Order> queueList = GetQueueList(); 

       foreach (Order orderItem in queueList) 
       { 
        Save(orderItem); 
        Console.WriteLine("Id :{0} Name {1} ", orderItem.Id, orderItem.Name); 
        queueList.Dequeue(); 
       } 
       Console.Read(); 

      } 

      private static void Save(Order orderItem) 
      { 
       //we are pretending to save or do something. 
      } 

      private static Queue<Order>GetQueueList() 
      { 
       Queue<Order> orderQueue = new Queue<Order>(); 
       orderQueue.Enqueue(new Order { Id = 1, Name = "Order 1" }); 
       orderQueue.Enqueue(new Order { Id = 1, Name = "Order 2" }); 
       orderQueue.Enqueue(new Order { Id = 2, Name = "Order 3" }); 
       orderQueue.Enqueue(new Order { Id = 3, Name = "Order 4" }); 
       orderQueue.Enqueue(new Order { Id = 4, Name = "Order 5" }); 
       return orderQueue; 
      } 
     } 

     public class Order 
     { 
      public int Id { get; set; } 
      public string Name { get; set; } 
     } 

回答

10

您的foreach更改爲:

while (queueList.Count > 0) 
{ 
    Order orderItem = queueList.Dequeue(); 
    Save(orderItem); 
    Console.WriteLine("Id :{0} Name {1} ", orderItem.Id, orderItem.Name); 
} 

編輯:

要重新處理,如果保存失敗做這樣的事情:

while (queueList.Count > 0) 
{ 
    Order orderItem = queueList.Dequeue(); 

    if (!Save(orderItem)) 
    { 
     queueList.Enqueue(orderItem); // Reprocess the failed save, probably want more logic to prevent infinite loop 
    } 
    else 
    { 
     Console.WriteLine("Successfully saved: {0} Name {1} ", orderItem.Id, orderItem.Name); 
    } 
} 

編輯:

約翰ķ提到線程安全這是一個有效的關注,如果你有多個線程訪問相同Queue。涉及簡單線程安全問題的ThreadSafeQueue類參見http://ccutilities.codeplex.com/SourceControl/changeset/view/40529#678487


編輯:這是我一直強調大家:-)

這裏提到的線程安全問題的範例線程安全的例子。如圖所示,默認Queue可以「遺漏」物品,同時仍然減少計數。

更新:更好地表示問題。我絕不會將空項添加到Queue,但標準Queue.Dequeue()會返回多個空值。僅這一點就沒問題,但這樣做會從內部收集中刪除有效的項目,並減少Count。這是一個安全的假設,在這個特定的例子中,從Queue.Dequeue()操作返回的每個null項表示一個從未處理過的有效項目。

using System; 
using System.Collections.Generic; 
using System.Threading; 

namespace SO_ThreadSafeQueue 
{ 
    class Program 
    { 
     static int _QueueExceptions; 
     static int _QueueNull; 
     static int _QueueProcessed; 

     static int _ThreadSafeQueueExceptions; 
     static int _ThreadSafeQueueNull; 
     static int _ThreadSafeQueueProcessed; 

     static readonly Queue<Guid?> _Queue = new Queue<Guid?>(); 
     static readonly ThreadSafeQueue<Guid?> _ThreadSafeQueue = new ThreadSafeQueue<Guid?>(); 
     static readonly Random _Random = new Random(); 

     const int Expected = 10000000; 

     static void Main() 
     { 
      Console.Clear(); 
      Console.SetCursorPosition(0, 0); 
      Console.WriteLine("Creating queues..."); 

      for (int i = 0; i < Expected; i++) 
      { 
       Guid guid = Guid.NewGuid(); 
       _Queue.Enqueue(guid); 
       _ThreadSafeQueue.Enqueue(guid); 
      } 

      Console.SetCursorPosition(0, 0); 
      Console.WriteLine("Processing queues..."); 

      for (int i = 0; i < 100; i++) 
      { 
       ThreadPool.QueueUserWorkItem(ProcessQueue); 
       ThreadPool.QueueUserWorkItem(ProcessThreadSafeQueue); 
      } 

      int progress = 0; 

      while (_Queue.Count > 0 || _ThreadSafeQueue.Count > 0) 
      { 
       Console.SetCursorPosition(0, 0); 

       switch (progress) 
       { 
        case 0: 
         { 
          Console.WriteLine("Processing queues... |"); 
          progress = 1; 
          break; 
         } 
        case 1: 
         { 
          Console.WriteLine("Processing queues... /"); 
          progress = 2; 
          break; 
         } 
        case 2: 
         { 
          Console.WriteLine("Processing queues... -"); 
          progress = 3; 
          break; 
         } 
        case 3: 
         { 
          Console.WriteLine("Processing queues... \\"); 
          progress = 0; 
          break; 
         } 
       } 

       Thread.Sleep(200); 
      } 

      Console.SetCursorPosition(0, 0); 
      Console.WriteLine("Finished processing queues..."); 
      Console.WriteLine("\r\nQueue Count:   {0} Processed: {1, " + Expected.ToString().Length + "} Exceptions: {2,4} Null: {3}", _Queue.Count, _QueueProcessed, _QueueExceptions, _QueueNull); 
      Console.WriteLine("ThreadSafeQueue Count: {0} Processed: {1, " + Expected.ToString().Length + "} Exceptions: {2,4} Null: {3}", _ThreadSafeQueue.Count, _ThreadSafeQueueProcessed, _ThreadSafeQueueExceptions, _ThreadSafeQueueNull); 

      Console.WriteLine("\r\nPress any key..."); 
      Console.ReadKey(); 
     } 

     static void ProcessQueue(object nothing) 
     { 
      while (_Queue.Count > 0) 
      { 
       Guid? currentItem = null; 

       try 
       { 
        currentItem = _Queue.Dequeue(); 
       } 
       catch (Exception) 
       { 
        Interlocked.Increment(ref _QueueExceptions); 
       } 

       if (currentItem != null) 
       { 
        Interlocked.Increment(ref _QueueProcessed); 
       } 
       else 
       { 
        Interlocked.Increment(ref _QueueNull); 
       } 

       Thread.Sleep(_Random.Next(1, 10)); // Simulate different workload times 
      } 
     } 

     static void ProcessThreadSafeQueue(object nothing) 
     { 
      while (_ThreadSafeQueue.Count > 0) 
      { 
       Guid? currentItem = null; 

       try 
       { 
        currentItem = _ThreadSafeQueue.Dequeue(); 
       } 
       catch (Exception) 
       { 
        Interlocked.Increment(ref _ThreadSafeQueueExceptions); 
       } 

       if (currentItem != null) 
       { 
        Interlocked.Increment(ref _ThreadSafeQueueProcessed); 
       } 
       else 
       { 
        Interlocked.Increment(ref _ThreadSafeQueueNull); 
       } 

       Thread.Sleep(_Random.Next(1, 10)); // Simulate different workload times 
      } 
     } 

     /// <summary> 
     /// Represents a thread safe <see cref="Queue{T}"/> 
     /// </summary> 
     /// <typeparam name="T"></typeparam> 
     public class ThreadSafeQueue<T> : Queue<T> 
     { 
      #region Private Fields 
      private readonly object _LockObject = new object(); 
      #endregion 

      #region Public Properties 
      /// <summary> 
      /// Gets the number of elements contained in the <see cref="ThreadSafeQueue{T}"/> 
      /// </summary> 
      public new int Count 
      { 
       get 
       { 
        int returnValue; 

        lock (_LockObject) 
        { 
         returnValue = base.Count; 
        } 

        return returnValue; 
       } 
      } 
      #endregion 

      #region Public Methods 
      /// <summary> 
      /// Removes all objects from the <see cref="ThreadSafeQueue{T}"/> 
      /// </summary> 
      public new void Clear() 
      { 
       lock (_LockObject) 
       { 
        base.Clear(); 
       } 
      } 

      /// <summary> 
      /// Removes and returns the object at the beggining of the <see cref="ThreadSafeQueue{T}"/> 
      /// </summary> 
      /// <returns></returns> 
      public new T Dequeue() 
      { 
       T returnValue; 

       lock (_LockObject) 
       { 
        returnValue = base.Dequeue(); 
       } 

       return returnValue; 
      } 

      /// <summary> 
      /// Adds an object to the end of the <see cref="ThreadSafeQueue{T}"/> 
      /// </summary> 
      /// <param name="item">The object to add to the <see cref="ThreadSafeQueue{T}"/></param> 
      public new void Enqueue(T item) 
      { 
       lock (_LockObject) 
       { 
        base.Enqueue(item); 
       } 
      } 

      /// <summary> 
      /// Set the capacity to the actual number of elements in the <see cref="ThreadSafeQueue{T}"/>, if that number is less than 90 percent of current capactity. 
      /// </summary> 
      public new void TrimExcess() 
      { 
       lock (_LockObject) 
       { 
        base.TrimExcess(); 
       } 
      } 
      #endregion 
     } 

    } 
} 
+0

嗨That works.What about if we want to dequeue the item only save if was successfull.Can I still do that? 抱歉+謝謝不熟悉隊列 – user9969 2010-02-06 19:54:27

+1

@ devnet247:不是真的。如果您沒有將頂部的物品出列,那麼您無法得到頂部的物品。您需要將失敗的項目移動到隊列的_tail_,就像本示例所做的那樣。 – 2010-02-06 20:02:58

+0

非常感謝您的幫助。現在這樣做。 我必須實現完整的線程,並且這個過程發生在wcf服務中。另一個要學習的事情。再次感謝 – user9969 2010-02-06 20:04:14

0

對我來說,它看起來像你正在嘗試處理隊列中的元素一個接一個。

如何在while循環中打包並處理來自Dequeue的每個元素,直到隊列爲空?

+0

在生產代碼中,我們有一個需要處理的訂單隊列,在他們每個人之後,我需要將他們出列。 你能告訴我一小段你的意思嗎? 謝謝 – user9969 2010-02-06 19:50:46

+0

@ devnet247:看到我​​的答案代碼片段。 – 2010-02-06 19:52:21

1

的foreach作爲一個合理的方式,通過隊列迭代時,你是不是刪除項目

當要刪除和處理項目中,線程安全的,正確的方法是隻刪除它們 一個在並在刪除後處理它們。

的方法之一是這個

// the non-thread safe way 
// 
while (queueList.Count > 0) 
{ 
    Order orderItem = queueList.Dequeue(); 
    Save(orderItem); 
    Console.WriteLine("Id :{0} Name {1} ", orderItem.Id, orderItem.Name); 
} 

有可能在隊列queueList.Count 和queueList.Dequeue()之間變更的項目數,所以是線程安全的,你必須只使用出隊,但當隊列爲空時,出隊將拋出,因此您必須使用異常處理程序。

// the thread safe way. 
// 
while (true) 
{ 
    Order orderItem = NULL; 
    try { orderItem = queueList.Dequeue(); } catch { break; } 
    if (null != OrderItem) 
    { 
     Save(orderItem); 
     Console.WriteLine("Id :{0} Name {1} ", orderItem.Id, orderItem.Name); 
    } 
} 
+0

圍繞捕獲的嘗試不會解決線程安全問題,因爲在許多線程觸及相同的隊列時,Dequeue()可能會在實際從內部集合中刪除項目時返回空對象。有關更好的選項,請參閱http://ccutilities.codeplex.com/SourceControl/changeset/view/40529#678487。 – 2010-02-06 20:00:03

+0

@Cory:謝謝。你有這個行爲的參考嗎? – 2010-02-06 20:01:03

+0

@John:沒有經驗。我最近編寫了一個應用程序,它生成了100個線程來處理來自單個'Queue'文件的文件,並注意到即使我的Queue.Count變爲零,其他「已處理」計數器也沒有加到最初的Queue.Count '。無論我在同一個「隊列」上投擲了多少個線程,我添加到「ThreadSafeQueue」的簡單鎖定提供了一致的結果。 – 2010-02-06 20:04:28