我有一個WCF服務託管在Windows服務中。這個服務公開2點的方法:關於同時處理單個和批量請求的體系結構
bool ProcessClaim(string options, ref string xml);
需要一些數據作爲輸入,執行一些處理(包括IO綁定操作,像DB查詢),並返回結果回來。void RunJob(string ticket);
立即返回。根據ticket
,從存儲器(例如DB或文件系統)讀取輸入數據,對每個數據元素進行相同的處理,並將結果保存回存儲器。批量通常由許多索賠組成。
用戶可以撥打ProcessClaim
來處理單個請求,RunJob
運行批處理。幾批可以同時運行。每個處理請求包裝爲Task
,因此所有請求都並行執行。 問題不在於通過安排大量請求來允許批量塞滿處理隊列。換句話說,如果用戶執行大量批處理,它將會阻止小批量和單個處理請求大量時間。 於是我想出了下面的模式,由Albahari(很簡單)很好地描述:
public sealed class ProcessingQueue : IDisposable
{
private class WorkItem
{
public readonly TaskCompletionSource<string> TaskSource;
public readonly string Options;
public readonly string Claim;
public readonly CancellationToken? CancelToken;
public WorkItem(
TaskCompletionSource<string> taskSource,
string options,
string claim,
CancellationToken? cancelToken)
{
TaskSource = taskSource;
Options = options;
Claim = claim;
CancelToken = cancelToken;
}
}
public ProcessingQueue()
: this(Environment.ProcessorCount)
{
}
public ProcessingQueue(int workerCount)
{
_taskQ = new BlockingCollection<WorkItem>(workerCount * 2);
for (var i = 0; i < workerCount; i++)
Task.Factory.StartNew(Consume);
}
public void Dispose()
{
_taskQ.CompleteAdding();
}
private readonly BlockingCollection<WorkItem> _taskQ;
public Task<string> EnqueueTask(string options, string claim, CancellationToken? cancelToken = null)
{
var tcs = new TaskCompletionSource<string>();
_taskQ.Add(new WorkItem(tcs, options, claim, cancelToken));
return tcs.Task;
}
public static Task<string> ProcessRequest(string options, string claim, CancellationToken? cancelToken = null)
{
return Task<string>.Factory.StartNew(() => ProcessItem(options, claim));
}
private void Consume()
{
foreach (var workItem in _taskQ.GetConsumingEnumerable())
{
if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested)
workItem.TaskSource.SetCanceled();
else
{
try
{
workItem.TaskSource.SetResult(ProcessItem(workItem.Options, workItem.Claim));
}
catch (Exception ex)
{
workItem.TaskSource.SetException(ex);
}
}
}
}
private static string ProcessItem(string options, string claim)
{
// do some actual work here
Thread.Sleep(2000); // simulate work;
return options + claim; // return final result
}
}
靜態方法ProcessRequest
可以用來處理單個請求,而實例方法EnqueueTask
- 批量處理。當然,所有批次都必須使用單個共享實例ProcessingQueue
。儘管這種方式非常好,可以控制同時運行多個批次的步伐,有一些看上去是錯的對我說:
- 要保持工作的線程池手動
- 難以猜測的最佳工作線程(I使用由默認處理器核的數量)螺紋
- 一束保持當沒有批次運行阻塞,浪費系統資源
- IO結合的處理塊的工作線程減少CPU的使用 的效率的部分的數
我想知道,是否有更好的方式來處理這種情況?
更新: 要求之一是提供全功率的批次,當用戶執行一個批次的意思,並沒有其他傳入的請求,所有資源必須努力處理這批專用。
這聽起來像你想要以更一致的方式分配負載。我會看看服務巴士來做到這一點。 http://nservicebus.com/ – 2012-02-06 15:20:44