此page爲您提供使用Windows Azure Service Bus時的性能提示。
關於並行處理,您可以有一個線程池進行處理,每當您收到消息時,您只需抓取其中一個池併爲其分配一條消息。您需要管理該池。例如,方法BeginReceiveBatch/EndReceiveBatch允許您從隊列(異步)中檢索多個「項目」,然後使用「AsParallel」進行轉換由前面的方法返回的IEnumerable並在多個線程中處理消息。
非常簡單和裸露的骨頭樣本:
var messages = await Task.Factory.FromAsync<IEnumerable<BrokeredMessage>>(Client.BeginReceiveBatch(3, null, null), Client.EndReceiveBatch);
messages.AsParallel().WithDegreeOfParallelism(3).ForAll(item =>
{
ProcessMessage(item);
});
即代碼檢索從隊列3個消息和過程然後在「3個線程」(注:它不能保證它將使用3個線程,將.NET分析系統資源,如果需要,它將使用多達3個線程)
您也可以刪除「WithDegreeOfParallelism」部分,.NET將使用它需要的任何線程。
在一天結束時有多種方式可以做到這一點,您必須決定哪一種方法對您更好。
UPDATE:樣品,而無需使用ASYNC/AWAIT
這是使用常規的開始/結束異步模式的基本(沒有錯誤檢查)樣品。
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.ServiceRuntime;
namespace WorkerRoleWithSBQueue1
{
public class WorkerRole : RoleEntryPoint
{
// The name of your queue
const string QueueName = "QUEUE_NAME";
const int MaxThreads = 3;
// QueueClient is thread-safe. Recommended that you cache
// rather than recreating it on every request
QueueClient Client;
bool IsStopped;
int dequeueRequests = 0;
public override void Run()
{
while (!IsStopped)
{
// Increment Request Counter
Interlocked.Increment(ref dequeueRequests);
Trace.WriteLine(dequeueRequests + " request(s) in progress");
Client.BeginReceive(new TimeSpan(0, 0, 10), ProcessUrgentEmails, Client);
// If we have made too many requests, wait for them to finish before requesting again.
while (dequeueRequests >= MaxThreads && !IsStopped)
{
System.Diagnostics.Trace.WriteLine(dequeueRequests + " requests in progress, waiting before requesting more work");
Thread.Sleep(2000);
}
}
}
void ProcessUrgentEmails(IAsyncResult result)
{
var qc = result.AsyncState as QueueClient;
var sendEmail = qc.EndReceive(result);
// We have received a message or has timeout... either way we decrease our counter
Interlocked.Decrement(ref dequeueRequests);
// If we have a message, process it
if (sendEmail != null)
{
var r = new Random();
// Process the message
Trace.WriteLine("Processing message: " + sendEmail.MessageId);
System.Threading.Thread.Sleep(r.Next(10000));
// Mark it as completed
sendEmail.BeginComplete(ProcessEndComplete, sendEmail);
}
}
void ProcessEndComplete(IAsyncResult result)
{
var bm = result.AsyncState as BrokeredMessage;
bm.EndComplete(result);
Trace.WriteLine("Completed message: " + bm.MessageId);
}
public override bool OnStart()
{
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
// Create the queue if it does not exist already
string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
if (!namespaceManager.QueueExists(QueueName))
{
namespaceManager.CreateQueue(QueueName);
}
// Initialize the connection to Service Bus Queue
Client = QueueClient.CreateFromConnectionString(connectionString, QueueName);
IsStopped = false;
return base.OnStart();
}
public override void OnStop()
{
// Waiting for all requestes to finish (or timeout) before closing
while (dequeueRequests > 0)
{
System.Diagnostics.Trace.WriteLine(dequeueRequests + " request(s), waiting before stopping");
Thread.Sleep(2000);
}
// Close the connection to Service Bus Queue
IsStopped = true;
Client.Close();
base.OnStop();
}
}
}
希望它有幫助。
謝謝丹尼爾的迴應。我在[MSDN](http://msdn.microsoft.com/en-us/library/windowsazure/hh528527.aspx)上實施(稍作改動)示例。它工作正常,但[ProcessEndComplete](http://msdn.microsoft.com/en-us/library/windowsazure/hh528527.aspx#code-snippet-2)方法根本沒有被調用,所以相同的消息被處理過再次。 – olatunjee
簽名爲'BrokeredMessage.BeginComplete(AsyncCallBack,BrokeredMessage);'的方法根本就沒有被調用。 – olatunjee
當應用程序從隊列中檢索一條消息時,會調用ProcessEndComplete,並在這個方法中調用'BeginComplete'。我猜你的代碼已經達到了'ProcessEndComplete',對嗎?...當你調用'BeginComplete'時你有什麼錯誤嗎? – Daniel