2017-04-21 59 views
0

我有一個類至極執行一些數據處理:如何在多個處理器之間分派工作?

class Processor 
{ 
    public Processor() { 
     // Load lot of data 
    } 
    public string GetResult(string input) { 
     // ... 
    } 
} 

我需要實現一個服務至極公開HTTP API這一類。我使用Owin和Microsoft.AspNet。* libs來託管HTTP Web API。對於每個請求,它創建一個新的線程來處理它,但是我無法在每個請求上實例化Processor,因爲在構造器中加載一些數據需要很長時間。此外,我不能重用來自不同線程的一個實例,因爲它不是線程安全的。但是我可以在服務啓動時實例化Processor的幾個實例,然後在它們之間調度工作。假設我允許多達20個併發HTTP請求用於我的服務。我創建的Processor 20個實例,並添加忙標誌到類:

class Processor 
{ 
    public bool Busy { get; set; } 
    // ... 
} 

我寫Dispatcher類是這樣的:

public class ServiceController : ApiController 
{ 
    static Dispatcher _dispatcher = new Dispatcher(20); 

    [Route("result")] 
    [HttpGet] 
    public string Result(string input) 
    { 
     return _dispatcher.GetResult(input); 
    } 
} 

class Dispatcher 
{ 
    readonly Processor[] _processors; 
    readonly SemaphoreSlim _semaphore; 

    public Dispatcher(int maxProcessors) 
    { 
     _semaphore = new SemaphoreSlim(maxProcessors); 
     _processors = new Processor[maxProcessors]; 
     // Instantiate Processors, etc... 
    } 

    public string GetResult(string input) 
    { 
     try 
     { 
      _semaphore.Wait(); // Surplus requests will wait here. 
      Processor processor; 
      lock (_processors) 
      { 
       // It is guaranteed that such processor exists if we entered the semaphore. 
       processor = _processors.First(p => !p.Busy); 
       processor.Busy = true; 
      } 
      var result = processor.GetResult(input); 
      processor.Busy = false; 
      return result; 
     } 
     finally 
     { 
      _semaphore.Release(); 
     } 
    } 
} 

然後我基本上可以ApiController調用它通過Dispatcher

它是否爲我的目的正確實施? 我測試了它,它可以工作,但我不知道我是否重新創建了輪子,並且.NET Framework已經有所準備用於我的案例,或者它可以更容易地實現。

+0

https://msdn.microsoft.com/en-us/library/dd267312 – SLaks

+0

@SLaks這是不太清楚如何在我的情況下使用'BlockingCollection',因爲我不僅需要分配工作項目,還需要收集結果。假設我有一些從'BlockingCollection'消耗的工人,我應該等到相應的結果準備好並獲得它。這是可以解決的,但我不知道它比這更容易... – greatvovan

+1

看起來像一個非常好的實施給我。我不知道可以處理你的用例的.Net配置。如果處理器數量總是等於最大併發請求數量,那麼信號量可能是多餘的(較低級別的信號量已經可以保護處理器池免於飢餓);但是,如果Processor類受CPU限制而非IO限制,則可能需要將處理器池大小調整爲可用處理內核的數量,這樣可最大限度地減少併發線程之間的上下文切換,並可能獲得更好的整體性能。 – BlueStrat

回答

0

基本上在您的類將要在線程中運行,創建一個事件和事件處理程序。然後旋轉此任務的對象可以註冊該事件。當它被任務提出時(在這種情況下,當事件完成時你會提出事件),你可以做一些事情,即。給它更多的工作。 public event TaskCompleteEventHandler OnComplete; public event TaskErrorEventHandler OnError;

註冊到對象的事件時起轉類:

task.OnComplete += TaskComplete; 
task.OnError += TaskComplete; 

創建的功能

在將在子線程運行的類創建活動調用類將處理事件:

public void TaskComplete() 
{ 
//give the thread more work 
} 
+0

我越來越意識到你誤解了這個問題。就我而言,我必須從我能夠派遣工作的地方獲得一個入口點。此入口點是處理對Web服務的特定URL的請求的方法,並且此方法由不同的線程(很可能)同時調用。沒有任何有限的項目要處理,我必須儘快處理傳入的請求並返回結果。 – greatvovan

相關問題