2010-03-18 158 views
4

C#線程模式我有一個實現(),我最初使用ThreadPool.QueueUserWorkItem開始/結束調用模式線程我的工作一類。在線程上完成的工作不會循環,但需要一些時間來處理,因此工作本身不容易停止。,讓我沖洗

我現在有副作用,有人使用我的類正在調用Begin(帶回調)一大堆次來做很多處理,所以ThreadPool.QueueUserWorkItem正在創建大量線程來執行處理。這本身並不壞,但有些情況下,他們想放棄處理並開始一個新的過程,但他們被迫等待他們的第一個請求完成。

因爲ThreadPool.QueueUseWorkItem()不允許我取消線程我試圖想出一個更好的方法排隊工作,也許在我的類中使用一個明確的FlushQueue()方法來允許調用者放棄我的隊列中的工作。

任何人有適合我的需要線程模式有何建議?

編輯:我目前針對2.0框架。我目前認爲消費者/生產者隊列可能有效。有沒有人有關於沖洗隊列的想法?

編輯2問題澄清: 由於我每次呼叫者使用與回調開始創建線程池一個全新的線程時使用的開始/結束模式在我的課。這個調用進行了非常少量的處理,而不是我想要取消的地方。這是我希望阻止的隊列中未完成的工作。

ThreadPool默認情況下每個處理器創建250個線程的事實意味着,如果您要求ThreadPool使用QueueUserWorkItem()排隊大量項目,最終會創建大量的併發線程,停止。

主叫方能夠將CPU推到100%,不僅是工作,但工作的創作,因爲我的方式排隊的線程。

我通過使用生產者/消費者模式,我可以排隊這些線程到我自己的隊列中,讓我來主持,我創建多少個線程,以避免CPU秒殺創建所有的併發線程的思想。我可能會允許我的班級的調用者在放棄請求時清除隊列中的所有作業。

我目前正試圖自己實現這一點,但認爲SO是一個很好的地方,有人說看看這段代碼,否則你將無法沖洗,因爲這或沖洗不是正確的術語,你的意思是這個。

回答

1

編輯我的答案,因爲OP採用2.0不適用。對於任何讀取此問題並使用4.0的人,都離開並切換到CW如果您正在使用C#4.0,或者可能依賴某個早期版本的並行框架,則可以使用其內置的取消支持。這並不像取消線程那麼容易,但框架更加可靠(取消線程非常有吸引力,但也非常危險)。

裏德做了這樣一個很好的文章,你應該看一看

+0

感謝您的插件,賈裏德;)這,順便說一句,是這個「最好的」方法。可供選擇的方法確實不足,並且都有一些非常嚴重的問題。 – 2010-03-18 22:07:52

+0

我目前的目標是2.0框架,所以並行擴展不會幫助我的情況。 – Jeff 2010-03-18 22:33:35

1

我已經在過去使用的方法,但它肯定不是一個最好的做法是奉獻每個線程都有一個類實例,並在該類上有一箇中止標誌。然後在線程中定期調用的類上創建一個ThrowIfAborting方法(特別是如果線程正在運行一個循環,只需在每次迭代中調用它)。如果該標誌已被設置,ThrowIfAborting將簡單地拋出一個異常,該異常在線程的主要方法中被捕獲。只要確保在您中止時清理您的資源。

+0

沒關係,只要墮胎確實是例外情況,而不是經常發生的事情。雖然我覺得'ThrowIfAborting'會比'HandleAbort'更清晰,因爲它沒有做任何處理。 – 2010-03-18 22:10:40

+0

@丹尼爾謝謝 - 命名有時很難。我偷了你的名字,並用它更新了我的回覆。 – Dathan 2010-03-19 00:02:53

+0

+1從Jare​​d的回答中提到的取消框架來看,您的回答基本上就是他們如何去做的本質。唯一的區別是中止狀態保存在一個單獨的數據類型中,並且拋出方法被稱爲「ThrowIfCancellationRequested」。 – 2010-03-19 08:28:24

0

我已經通過使用圍繞1+ BackgroundWorker實例的包裝類來解決我認爲是確切的問題。

不幸的是,我無法發佈我的整個班級,但這裏有基本的概念及其侷限性。

使用: 您只需創建一個實例,並調用RunOrReplace(...)當你想取消您的老工人,並開始一個新的。如果老員工很忙,要求取消,然後另一個員工用於立即執行您的請求。

public class BackgroundWorkerReplaceable : IDisposable 
{ 

BackgroupWorker activeWorker = null; 
object activeWorkerSyncRoot = new object(); 
List<BackgroupWorker> workerPool = new List<BackgroupWorker>(); 

DoWorkEventHandler doWork; 
RunWorkerCompletedEventHandler runWorkerCompleted; 

public bool IsBusy 
{ 
    get { return activeWorker != null ? activeWorker.IsBusy; : false } 
} 

public BackgroundWorkerReplaceable(DoWorkEventHandler doWork, RunWorkerCompletedEventHandler runWorkerCompleted) 
{ 
    this.doWork = doWork; 
    this.runWorkerCompleted = runWorkerCompleted; 
    ResetActiveWorker(); 
} 

public void RunOrReplace(Object param, ...) // Overloads could include ProgressChangedEventHandler and other stuff 
{ 
    try 
    { 
     lock(activeWorkerSyncRoot) 
     { 
      if(activeWorker.IsBusy) 
      { 
       ResetActiveWorker(); 
      } 

      // This works because if IsBusy was false above, there is no way for it to become true without another thread obtaining a lock 
      if(!activeWorker.IsBusy) 
      { 
       // Optionally handle ProgressChangedEventHandler and other features (under the lock!) 

       // Work on this new param 
       activeWorker.RunWorkerAsync(param); 
      } 
      else 
      { // This should never happen since we create new workers when there's none available! 
       throw new LogicException(...); // assert or similar 
      } 
     } 
    } 
    catch(...) // InvalidOperationException and Exception 
    { // In my experience, it's safe to just show the user an error and ignore these, but that's going to depend on what you use this for and where you want the exception handling to be 
    } 
} 

public void Cancel() 
{ 
    ResetActiveWorker(); 
} 

public void Dispose() 
{ // You should implement a proper Dispose/Finalizer pattern 
    if(activeWorker != null) 
    { 
     activeWorker.CancelAsync(); 
    } 
    foreach(BackgroundWorker worker in workerPool) 
    { 
     worker.CancelAsync(); 
     worker.Dispose(); 
     // perhaps use a for loop instead so you can set worker to null? This might help the GC, but it's probably not needed 
    } 
} 

void ResetActiveWorker() 
{ 
    lock(activeWorkerSyncRoot) 
    { 
     if(activeWorker == null) 
     { 
      activeWorker = GetAvailableWorker(); 
     } 
     else if(activeWorker.IsBusy) 
     { // Current worker is busy - issue a cancel and set another active worker 
      activeWorker.CancelAsync(); // Make sure WorkerSupportsCancellation must be set to true [Link9372] 
      // Optionally handle ProgressEventHandler -= 
      activeWorker = GetAvailableWorker(); // Ensure that the activeWorker is available 
     } 
     //else - do nothing, activeWorker is already ready for work! 
    } 
} 
BackgroupdWorker GetAvailableWorker() 
{ 
    // Loop through workerPool and return a worker if IsBusy is false 
    // if the loop exits without returning... 
    if(activeWorker != null) 
    { 
     workerPool.Add(activeWorker); // Save the old worker for possible future use 
    } 
    return GenerateNewWorker(); 
} 
BackgroundWorker GenerateNewWorker() 
{ 
    BackgroundWorker worker = new BackgroundWorker(); 
    worker.WorkerSupportsCancellation = true; // [Link9372] 
    //worker.WorkerReportsProgress 
    worker.DoWork += doWork; 
    worker.RunWorkerCompleted += runWorkerCompleted; 
    // Other stuff 
    return worker; 
} 

} // class 

臨/ CON

這具有在開始你的新的執行非常低延時的優點,因爲新線程不必等待舊的結束

這是以從未獲得GC'd的BackgroundWorker對象的理論永無止境的增長爲代價的。但是,在實踐中,下面的代碼嘗試回收舊工作人員,因此通常不會遇到大量理想線程。如果您因爲計劃如何使用此類而擔心此問題,則可以實現一個啓動CleanUpExcessWorkers(...)方法的Timer,或者讓ResetActiveWorker()執行此清理(以較長的RunOrReplace(。)爲代價)。 ..)延遲)。

使用它的主要代價正是它的好處 - 它不等待前一個線程退出,例如,如果DoWork正在執行數據庫調用並且執行RunOrReplace(...)10次在線程快速連接時,數據庫調用可能不會立即取消 - 因此您將有10個查詢正在運行,從而使所有這些查詢都變得緩慢!這通常對Oracle來說很好,只會造成很小的延遲,但我沒有其他數據庫的經驗(爲了加快清理速度,我讓取消的工作人員告訴Oracle取消該命令)。正確使用下面描述的EventArgs大多解決了這個問題。

另一個小問題是,無論BackgroundWorker執行什麼代碼必須都與此概念兼容 - 它必須能夠安全地從被取消恢復。 DoWorkEventArgs和RunWorkerCompletedEventArgs具有您應該使用的Cancel/Canceled屬性。例如,如果您在DoWork方法中執行數據庫調用(主要是使用此類),則需要確保定期檢查這些屬性並執行相應的清理。

0

您可以擴展開始/結束模式以成爲開始/取消/結束模式。Cancel方法可以設置一個取消標誌工作線程定期輪詢。當工作線程檢測到取消請求時,它可以停止工作,根據需要清理資源,並報告操作是取消作爲End參數的一部分。