2012-02-06 44 views
4

我有一個WCF服務託管在Windows服務中。這個服務公開2點的方法:關於同時處理單個和批量請求的體系結構

  1. bool ProcessClaim(string options, ref string xml);需要一些數據作爲輸入,執行一些處理(包括IO綁定操作,像DB查詢),並返回結果回來。
  2. void RunJob(string ticket);立即返回。根據ticket,從存儲器(例如DB或文件系統)讀取輸入數據,對每個數據元素進行相同的處理,並將結果保存回存儲器。批量通常由許多索賠組成。

用戶可以撥打ProcessClaim來處理單個請求,RunJob運行批處理。幾批可以同時運行。每個處理請求包裝爲Task,因此所有請求都並行執行。 問題不在於通過安排大量請求來允許批量塞滿處理隊列。換句話說,如果用戶執行大量批處理,它將會阻止小批量和單個處理請求大量時間。 於是我想出了下面的模式,由Albahari(很簡單)很好地描述:

public sealed class ProcessingQueue : IDisposable 
{ 
    private class WorkItem 
    { 
     public readonly TaskCompletionSource<string> TaskSource; 
     public readonly string Options; 
     public readonly string Claim; 
     public readonly CancellationToken? CancelToken; 

     public WorkItem(
      TaskCompletionSource<string> taskSource, 
      string options, 
      string claim, 
      CancellationToken? cancelToken) 
     { 
      TaskSource = taskSource; 
      Options = options; 
      Claim = claim; 
      CancelToken = cancelToken; 
     } 
    } 

    public ProcessingQueue() 
     : this(Environment.ProcessorCount) 
    { 
    } 

    public ProcessingQueue(int workerCount) 
    { 
     _taskQ = new BlockingCollection<WorkItem>(workerCount * 2); 

     for (var i = 0; i < workerCount; i++) 
      Task.Factory.StartNew(Consume); 
    } 

    public void Dispose() 
    { 
     _taskQ.CompleteAdding(); 
    } 

    private readonly BlockingCollection<WorkItem> _taskQ; 

    public Task<string> EnqueueTask(string options, string claim, CancellationToken? cancelToken = null) 
    { 
     var tcs = new TaskCompletionSource<string>(); 
     _taskQ.Add(new WorkItem(tcs, options, claim, cancelToken)); 
     return tcs.Task; 
    } 

    public static Task<string> ProcessRequest(string options, string claim, CancellationToken? cancelToken = null) 
    { 
     return Task<string>.Factory.StartNew(() => ProcessItem(options, claim)); 
    } 

    private void Consume() 
    { 
     foreach (var workItem in _taskQ.GetConsumingEnumerable()) 
     { 
      if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested) 
       workItem.TaskSource.SetCanceled(); 
      else 
      { 
       try 
       { 
        workItem.TaskSource.SetResult(ProcessItem(workItem.Options, workItem.Claim)); 
       } 
       catch (Exception ex) 
       { 
        workItem.TaskSource.SetException(ex); 
       } 
      } 
     } 
    } 

    private static string ProcessItem(string options, string claim) 
    { 
     // do some actual work here 
     Thread.Sleep(2000); // simulate work; 
     return options + claim; // return final result 
    } 
} 

靜態方法ProcessRequest可以用來處理單個請求,而實例方法EnqueueTask - 批量處理。當然,所有批次都必須使用單個共享實例ProcessingQueue。儘管這種方式非常好,可以控制同時運行多個批次的步伐,有一些看上去是錯的對我說:

  • 要保持工作的線程池手動
  • 難以猜測的最佳工作線程(I使用由默認處理器核的數量)螺紋
  • 一束保持當沒有批次運行阻塞,浪費系統資源
  • IO結合的處理塊的工作線程減少CPU的使用
  • 的效率的部分的數

我想知道,是否有更好的方式來處理這種情況?

更新: 要求之一是提供全功率的批次,當用戶執行一個批次的意思,並沒有其他傳入的請求,所有資源必須努力處理這批專用。

+0

這聽起來像你想要以更一致的方式分配負載。我會看看服務巴士來做到這一點。 http://nservicebus.com/ – 2012-02-06 15:20:44

回答

4

我想說,讓一個服務接口和一個主機容器來處理這兩種非常不同類型的需求可能是錯誤的。

您應該將服務分解爲兩個 - 一個返回對個別請求的響應,另一個將批量查詢排隊並在單個線程上處理它們。

通過這種方式,您可以爲您的實時消費者提供高可用性渠道,併爲您的消費者提供離線渠道。這些可以作爲單獨的問題進行部署和管理,使您可以在每個服務界面上提供不同的服務級別。

只是我對提出的架構的想法。

UPDATE

事實是,你的容量處理渠道是線下渠道。這種方式意味着消費者將不得不排隊等待並確定其要求返回的時間。

那麼工作隊列怎麼樣?每個作業都會在處理時獲取所有可用資源。處理作業後,調用者會收到通知,說明作業已完成。

+0

這是一個很好的觀點,我絕對應該將這些功能分開。但是,其中一個要求是爲批次提供全部電力,這意味着當用戶執行一個批次時,並且沒有其他傳入請求時,所有資源必須專用於處理該批次。您在單線程中處理批次的建議不符合此要求。另一方面,批量線程數量的增加會導致相同的初始問題。 – yuramag 2012-02-06 17:11:05

+0

我同意你的意見。我不知道如何最好地解決您的問題。您的挑戰是以某種方式將更多線程專用於處理批處理,同時最大化線程可用性。至少通過解耦您的通道,可以避免影響實時端點。 – 2012-02-10 11:58:01

+0

查看我的答案更新 – 2012-02-10 12:00:21

相關問題