2013-11-28 52 views
1

我需要處理用戶請求一個接一個(類似於像隊列作業)創建一個線程隊列

這是我有:

Thread checkQueue; 
Boolean IsComplete = true; 

protected void Start() 
{ 
    checkQueue = new Thread(CheckQueueState); 
    checkQueue.Start();  
} 

private void CheckQueueState() 
    { 
     while (true) 
     { 
      if (checkIsComplete) 
      { 
       ContinueDoSomething(); 

       checkQueue.Abort(); 
       break; 
      } 
      System.Threading.Thread.Sleep(1000); 
     } 
    } 

protected void ContinueDoSomething() 
{ 
    IsComplete = false; 
    ... 
    ... 
    IsComplete = true; //when done, set it to true 
} 

每次當有來自用戶的新要求,系統會調用Start()函數並檢查前一個工作是否完成,如果是,則繼續下一個工作。

但我不確定這樣做是否正確。

任何改進或任何更好的方法來做到這一點?

+0

您有效地不能使用Thread.Abort的。在SO上搜索以找到關於原因的一些討論。 – usr

+0

您確定要在ASP.NET中執行此操作嗎?這聽起來像是在服務器上運行並獨立於客戶端瀏覽器運行的東西。 –

+0

@usr好的。謝謝你的建議。 – My2ndLovE

回答

1

我喜歡usr關於使用TPL Dataflow的建議。如果您有能力爲項目添加外部依賴項(TPL Dataflow不是作爲.NET框架的一部分分發),那麼它爲您的問題提供了一個乾淨的解決方案。

但是,如果您堅持框架所提供的內容,您應該看看BlockingCollection<T>,它與您試圖實施的生產者 - 消費者模式很好地協同工作。

我把一個快速的.NET 4.0例子拋在一起,以說明如何在你的場景中使用它。這不是很瘦,因爲它有很多電話Console.WriteLine()。但是,如果你把所有的混亂都解決了,那就非常簡單了。

在它的中心是一個BlockingCollection<Action>,它獲取Action代表添加到它從任何線程,和一個線程專用於出隊,並在它們被添加的確切順序執行這些Action小號順序。

using System; 
using System.Collections.Concurrent; 
using System.Threading; 
using System.Threading.Tasks; 

namespace SimpleProducerConsumer 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      Console.WriteLine("Main thread id is {0}.", Thread.CurrentThread.ManagedThreadId); 

      using (var blockingCollection = new BlockingCollection<Action>()) 
      { 
       // Start our processing loop. 
       var actionLoop = new Thread(() => 
       { 
        Console.WriteLine(
         "Starting action loop on thread {0} (dedicated action loop thread).", 
         Thread.CurrentThread.ManagedThreadId, 
         Thread.CurrentThread.IsThreadPoolThread); 

        // Dequeue actions as they become available. 
        foreach (var action in blockingCollection.GetConsumingEnumerable()) 
        { 
         // Invoke the action synchronously 
         // on the "actionLoop" thread. 
         action(); 
        } 

        Console.WriteLine("Action loop terminating."); 
       }); 

       actionLoop.Start(); 

       // Enqueue some work. 
       Console.WriteLine("Enqueueing action 1 from thread {0} (main thread).", Thread.CurrentThread.ManagedThreadId); 
       blockingCollection.Add(() => SimulateWork(1)); 

       Console.WriteLine("Enqueueing action 2 from thread {0} (main thread).", Thread.CurrentThread.ManagedThreadId); 
       blockingCollection.Add(() => SimulateWork(2)); 

       // Let's enqueue it from another thread just for fun. 
       var enqueueTask = Task.Factory.StartNew(() => 
       { 
        Console.WriteLine(
         "Enqueueing action 3 from thread {0} (task executing on a thread pool thread).", 
         Thread.CurrentThread.ManagedThreadId); 

        blockingCollection.Add(() => SimulateWork(3)); 
       }); 

       // We have to wait for the task to complete 
       // because otherwise we'll end up calling 
       // CompleteAdding before our background task 
       // has had the chance to enqueue action #3. 
       enqueueTask.Wait(); 

       // Tell our loop (and, consequently, the "actionLoop" thread) 
       // to terminate when it's done processing pending actions. 
       blockingCollection.CompleteAdding(); 

       Console.WriteLine("Done enqueueing work. Waiting for the loop to complete."); 

       // Block until the "actionLoop" thread terminates. 
       actionLoop.Join(); 

       Console.WriteLine("Done. Press Enter to quit."); 
       Console.ReadLine(); 
      } 
     } 

     private static void SimulateWork(int actionNo) 
     { 
      Thread.Sleep(500); 
      Console.WriteLine("Finished processing action {0} on thread {1} (dedicated action loop thread).", actionNo, Thread.CurrentThread.ManagedThreadId); 
     } 
    } 
} 

,輸出是:

0.016s: Main thread id is 10. 
0.025s: Enqueueing action 1 from thread 10 (main thread). 
0.026s: Enqueueing action 2 from thread 10 (main thread). 
0.027s: Starting action loop on thread 11 (dedicated action loop thread). 
0.028s: Enqueueing action 3 from thread 6 (task executing on a thread pool thread). 
0.028s: Done enqueueing work. Waiting for the loop to complete. 
0.527s: Finished processing action 1 on thread 11 (dedicated action loop thread). 
1.028s: Finished processing action 2 on thread 11 (dedicated action loop thread). 
1.529s: Finished processing action 3 on thread 11 (dedicated action loop thread). 
1.530s: Action loop terminating. 
1.532s: Done. Press Enter to quit. 
+0

感謝您的詳細解釋。其中一個要求是,它必須一個接一個地執行(完成每個任務可能需要30秒)。我如何確保當前任務在下一個任務開始之前完成? – My2ndLovE

+0

@ My2ndLovE,我意識到這一要求。我在輸出行前添加了一個時間戳,以便您可以看到排隊的作業實際上是按順序執行的(每個作業需要〜500ms)。 你用的是「任務」已經混淆,雖然我有點。 *您是否在使用「任務」封裝您的工作?因爲如果你使用'ConcurrentExclusiveSchedulerPair'的'ExclusiveScheduler'將會是更好的方法。 –

0

使用TPL數據流庫中的ActionBlock<T>。將其MaxDegreeOfParalellism設置爲1,即可完成。請注意,ASP.NET工作進程可以隨時回收(例如,由於計劃的回收,內存限制,服務器重新啓動或部署),因此排隊的工作可能會突然丟失,恕不另行通知。我建議你看看像MSMQ(或其他)的外部排隊解決方案,以獲得可靠的隊列。

+0

ActionBlock需要運行在.net framework 4.5?我正在使用vs2010 – My2ndLovE

+0

它現在已經可以作爲圖書館很長一段時間了。它最多需要4.0。 – usr

0

看看微軟的反應擴展。該庫包含一組可用的調度程序,它們遵循您所需的語義。

最適合您需要的是EventLoopScheduler。它會排列行動並且一個接一個地執行它們。如果它完成一個動作,並且隊列中有更多項目,它將順序處理同一線程上的動作,直到隊列爲空,然後它處理線程。當一個新的行動排隊時,它會創建一個新的線程。這是非常有效的。

的代碼是超級簡單,看起來像這樣:

var scheduler = new System.Reactive.Concurrency.EventLoopScheduler(); 

scheduler.Schedule(() => { /* action here */ }); 

如果你需要有一個新的線程執行的每一個排隊的話,就用這樣的:

var scheduler = new System.Reactive.Concurrency.NewThreadScheduler(); 

scheduler.Schedule(() => { /* action here */ }); 

很簡單。