2013-05-27 40 views
3

問題:我收到大量電子郵件,目前平均在任何時間發送隊列中的10封電子郵件。我一次處理隊列中的代碼;也就是接收郵件,處理郵件並最終發送郵件。這會導致在向用戶註冊服務時向用戶發送電子郵件相當長的時間。在Azure服務總線中並行處理郵件

我已經開始考慮修改代碼爲process the messages in parrallel說5異步。我想象寫一個方法,並使用CTP並行調用這個方法,比如5次。

我在如何實現這一點上迷失了方向。犯錯誤的代價非常大,因爲如果出現問題,用戶會感到失望。

Request:我需要幫助編寫在Azure服務總線中並行處理消息的代碼。 謝謝。

My code in a nutshell. 

Public .. Run() 
{ 
    _myQueueClient.BeginReceive(ProcessUrgentEmails, _myQueueClient); 
} 

void ProcessUrgentEmails(IAsyncResult result) 
{ 
    //casted the `result` as a QueueClient 
    //Used EndReceive on an object of BrokeredMessage 
    //I processed the message, then called 
    sendEmail.BeginComplete(ProcessEndComplete, sendEmail); 
} 


//This method is never called despite having it as callback function above. 
void ProcessEndComplete(IAsyncResult result) 
{ 
    Trace.WriteLine("ENTERED ProcessEndComplete method..."); 
    var bm = result.AsyncState as BrokeredMessage; 
    bm.EndComplete(result); 
} 

回答

4

page爲您提供使用Windows Azure Service Bus時的性能提示。

關於並行處理,您可以有一個線程池進行處理,每當您收到消息時,您只需抓取其中一個池併爲其分配一條消息。您需要管理該池。例如,方法BeginReceiveBatch/EndReceiveBatch允許您從隊列(異步)中檢索多個「項目」,然後使用「AsParallel」進行轉換由前面的方法返回的IEnumerable並在多個線程中處理消息。

非常簡單和裸露的骨頭樣本:

var messages = await Task.Factory.FromAsync<IEnumerable<BrokeredMessage>>(Client.BeginReceiveBatch(3, null, null), Client.EndReceiveBatch); 

messages.AsParallel().WithDegreeOfParallelism(3).ForAll(item => 
{ 
    ProcessMessage(item); 
}); 

即代碼檢索從隊列3個消息和過程然後在「3個線程」(注:它不能保證它將使用3個線程,將.NET分析系統資源,如果需要,它將使用多達3個線程)

您也可以刪除「WithDegreeOfParallelism」部分,.NET將使用它需要的任何線程。

在一天結束時有多種方式可以做到這一點,您必須決定哪一種方法對您更好。

UPDATE:樣品,而無需使用ASYNC/AWAIT

這是使用常規的開始/結束異步模式的基本(沒有錯誤檢查)樣品。

using System; 
using System.Collections.Generic; 
using System.Diagnostics; 
using System.Linq; 
using System.Net; 
using System.Threading; 
using Microsoft.ServiceBus; 
using Microsoft.ServiceBus.Messaging; 
using Microsoft.WindowsAzure; 
using Microsoft.WindowsAzure.ServiceRuntime; 

namespace WorkerRoleWithSBQueue1 
{ 
    public class WorkerRole : RoleEntryPoint 
    { 
     // The name of your queue 
     const string QueueName = "QUEUE_NAME"; 
     const int MaxThreads = 3; 

     // QueueClient is thread-safe. Recommended that you cache 
     // rather than recreating it on every request 
     QueueClient Client; 
     bool IsStopped; 
     int dequeueRequests = 0; 

     public override void Run() 
     { 
      while (!IsStopped) 
      { 
       // Increment Request Counter 
       Interlocked.Increment(ref dequeueRequests); 

       Trace.WriteLine(dequeueRequests + " request(s) in progress"); 

       Client.BeginReceive(new TimeSpan(0, 0, 10), ProcessUrgentEmails, Client); 

       // If we have made too many requests, wait for them to finish before requesting again. 
       while (dequeueRequests >= MaxThreads && !IsStopped) 
       { 
        System.Diagnostics.Trace.WriteLine(dequeueRequests + " requests in progress, waiting before requesting more work"); 
        Thread.Sleep(2000); 
       } 

      } 
     } 


     void ProcessUrgentEmails(IAsyncResult result) 
     { 
      var qc = result.AsyncState as QueueClient; 
      var sendEmail = qc.EndReceive(result); 
      // We have received a message or has timeout... either way we decrease our counter 
      Interlocked.Decrement(ref dequeueRequests); 

      // If we have a message, process it 
      if (sendEmail != null) 
      { 
       var r = new Random(); 
       // Process the message 
       Trace.WriteLine("Processing message: " + sendEmail.MessageId); 
       System.Threading.Thread.Sleep(r.Next(10000)); 

       // Mark it as completed 
       sendEmail.BeginComplete(ProcessEndComplete, sendEmail); 
      } 

     } 


     void ProcessEndComplete(IAsyncResult result) 
     { 
      var bm = result.AsyncState as BrokeredMessage; 
      bm.EndComplete(result); 
      Trace.WriteLine("Completed message: " + bm.MessageId); 
     } 


     public override bool OnStart() 
     { 
      // Set the maximum number of concurrent connections 
      ServicePointManager.DefaultConnectionLimit = 12; 

      // Create the queue if it does not exist already 
      string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString"); 
      var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString); 
      if (!namespaceManager.QueueExists(QueueName)) 
      { 
       namespaceManager.CreateQueue(QueueName); 
      } 

      // Initialize the connection to Service Bus Queue 
      Client = QueueClient.CreateFromConnectionString(connectionString, QueueName); 
      IsStopped = false; 
      return base.OnStart(); 
     } 

     public override void OnStop() 
     { 
      // Waiting for all requestes to finish (or timeout) before closing 
      while (dequeueRequests > 0) 
      { 
       System.Diagnostics.Trace.WriteLine(dequeueRequests + " request(s), waiting before stopping"); 
       Thread.Sleep(2000); 
      } 
      // Close the connection to Service Bus Queue 
      IsStopped = true; 
      Client.Close(); 
      base.OnStop(); 
     } 
    } 
} 

希望它有幫助。

+0

謝謝丹尼爾的迴應。我在[MSDN](http://msdn.microsoft.com/en-us/library/windowsazure/hh528527.aspx)上實施(稍作改動)示例。它工作正常,但[ProcessEndComplete](http://msdn.microsoft.com/en-us/library/windowsazure/hh528527.aspx#code-snippet-2)方法根本沒有被調用,所以相同的消息被處理過再次。 – olatunjee

+0

簽名爲'BrokeredMessage.BeginComplete(AsyncCallBack,BrokeredMessage);'的方法根本就沒有被調用。 – olatunjee

+0

當應用程序從隊列中檢索一條消息時,會調用ProcessEndComplete,並在這個方法中調用'BeginComplete'。我猜你的代碼已經達到了'ProcessEndComplete',對嗎?...當你調用'BeginComplete'時你有什麼錯誤嗎? – Daniel