2012-10-23 24 views
0

我在我的系統中有很多「長期運行但未綁定CPU」的「作業」。我想設置一個Worker角色來處理這些角色,但它們具有足夠的可擴展性,以至於一個Worker角色可以輕鬆地擁有10-20個線程同時處理「作業」。如何在Worker中設置多個線程並向其發送消息?

這裏有一些問題建議使用TPL,我確實有一些有限的經驗。然而,我不明白的是如何管理這些線程,以便有最大數量的線程,以及在釋放線程時如何分配它們。

稍微複雜一點是我想用Ninject來創建每個「工作」所需的服務。

下面是我形象在我的頭上的工作:

while (true) 
{ 
    // Don't go unless we have a free slot (how do I implement this?!) 
    if (FreeThreadExists) 
    { 
     // Get the next message 
     CloudQueueMessage ThisMessage = Queue.GetMessage(TimeSpan.FromMinutes(3)); 

     // Get the new job and inject services 
     Job MyJob = Kernel.Get<Job>(); 

     // Start this job 
     // Will I need to keep ahold of this Task? 
     // And how do I know when it's done so that FreeThreadExists changes? 
     Task.Factory.StartNew(() => MyJob.Run(ThisMessage)); 
    } 
    else 
    {  
     // Sleep to prevent choking 
     Thread.Sleep(500); 
    } 
} 

然後在該線程刪除完成的消息。基本上我試圖將一個Worker分成20個「實例」,而不會失去太多的Azure功能(特別是我希望隊列消息超時/重試功能)。

我對.NET線程並不熟悉,對此有什麼最好的方法呢?

編輯:哇,我完全忘了補充一個重要的觀點:這需要跨越多個工作人員。所以,10個Worker角色,每個都有10個線程,消息被UI前端排隊,然後由第一個Worker以一個空閒線程出列並運行。

+0

只是所有作業添加到線程池,讓它把剩下的事情。它將管理日程安排,創建線程等。除非您能證明讓系統處理它是個問題,否則我不會再煩心了。這樣的事情令人驚訝。 – Servy

+0

我是個笨蛋,相關信息已添加到問題中。對不起 – Stevoman

+0

不改變我的答案。只需將您擁有的所有任務放入線程池並讓它處理即可。很有可能它會比你更有效地安排它們,它會爲你節省甚至嘗試的努力。 – Servy

回答

1

試試這個僞代碼:

while (true) 
{ 
    int maxThreadsPerWorkerRole = 3;//assuming each worker role can handle 3 jobs simultaneously 
    var messages = Queue.GetMessages(3);//Get 3 messages from the queue 
    if (messages != null && messages.Count > 0)//Ensuring there is some work which needs to be done 
    { 
     var myTasks = new List<Task>(); 
     for (int i=0; i<messages.Count; i++) 
     { 
      Job MyJob = Kernel.Get<Job>();//Get the job 
      var task = Task.Factory.StartNew(() => MyJob.Run(messages[i])); 
      myTasks.Add(task); 
     } 
     Task.WaitAll(myTasks.ToArray());//Wait for all tasks to complete. 
     for (int i=0; i<messages.Count; i++) 
     { 
      //Write code to delete the message. 
     } 
     //Check if the queue is empty or not. If the queue is not empty, then repeat this loop 
     //Otherwise simply exit this loop. 
     if (Queue.RetrieveApproximateMessageCount() == 0) 
     { 
      break; 
     } 
    } 
} 

希望這有助於。

1

我通常也會涉及TPL,就像您在代碼中提到的那樣。作爲Gaurav方法的替代方法,請參閱下面的代碼。下面的僞代碼使用Parallel.For來控制線程的創建。每個線程啓動並運行一個無限循環;如果沒有工作,確保你睡了一下。

// Start 10 threads 
Parallel.For(0, 10, (i) => 
{ 
    while (true) 
    { 
    // Get message from queue 
    var msg = Queue.GetMessage(); 
    if (msg != null) 
    { 
     // Do some work here... 
     StartSomeJob(); 

     // Then when you are done, delete the message 
     Queue.DeleteMessage(msg); 
    } 
    // Wait 1 second before fetching next work item from queue 
    System.Threading.Thread.Sleep(1000); 
    } 
}); 
+0

可以每秒有10個線程打我的隊列嗎? – Stevoman

+0

基於此:http://msdn.microsoft.com/en-us/library/windowsazure/hh767287.aspx Azure隊列的最大吞吐量爲2000條/秒,因此我認爲該服務可以處理它。但是,由於無論隊列中是否有消息,每條獲得消息都會產生交易費用,因此您可能需要考慮這一事實,並以某種合理的方式發送請求。一些人推薦了一種從隊列中獲取消息的指數退避策略。 –

+1

在我之前執行的一些性能測試中,我無法達到2000 msg/sec,但我可以確認我能夠獲得數百次點擊/秒。我們的可擴展性需要我們在多個存儲帳戶中分割多個隊列。關於成本的一點是好的;一個回退stragey是有道理的。另一件需要考慮的事情是,你可以一次將多達10件物品出列;記住這一點。 –