2013-06-25 64 views
3

我一直在研究這一兩天,並已閱讀關於多線程和blob客戶端的以前的問題並實施了他們的建議。任務線程和Azure CloudBlobClient

我已經將問題解決到下面。

沒有錯誤產生,只是沒有寫入threadtest容器(它已經存在)。有時候一個blob被寫入,然後什麼都沒有。

如果我增加睡眠到1秒一切都很好。

代碼的原因是基準Azure的blob寫入功能。 (我目前拿到8種單線程的情況下做70萬小時,但我敢肯定,我可以得到更高,如果我能想出解決辦法)

using System; 
using System.Net; 
using System.Threading; 
using Microsoft.WindowsAzure; 
using Microsoft.WindowsAzure.ServiceRuntime; 
using Microsoft.WindowsAzure.StorageClient; 
using System.Threading.Tasks; 

namespace ThreadedWriterTest 
{ 
    public class WorkerRole : RoleEntryPoint 
    { 
     private static CloudStorageAccount storageAccount; 

     public override void Run() 
     { 
      while (true) 
      { 
       Thread.Sleep(10); 
       Task.Factory.StartNew(()=> writeStuff()); 
      } 
     } 

     private void writeStuff() 
     { 
      CloudBlobClient threadClient = storageAccount.CreateCloudBlobClient(); 
      threadClient.GetBlobReference("threadtest/" + Guid.NewGuid().ToString()).UploadText("Hello " + Guid.NewGuid().ToString()); 
     } 



     public override bool OnStart() 
     { 
      ServicePointManager.DefaultConnectionLimit = 12; 
      storageAccount = CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("XXX")); 
      return base.OnStart(); 
     } 
    } 
} 
+0

如果只需要調用'writeStuff()'直接在while循環它的工作原理。 –

+2

如果平均寫入時間超過10毫秒,那麼最終會有很多「任務」等待。也許這就是你的問題 - 你的虛擬機掛在CPU或內存上?在每個可用連接啓動一個線程,並讓每個線程緊密循環寫入可能會更好。或者,甚至更好的是,使用具有某種機制的異步寫入方法來控制未完成的請求。 –

+0

你是真的。這段代碼產生了太多的併發線程,我使用了一個信號量來限制數量,並且很快就會發佈一個解決方案。 –

回答

1

產生太多的併發線程上面的代碼,我的幼稚的做法用Thread.Sleep()節流並不足以限制線程數。

引入一個信號量(基本上是一個用於統計並行執行多少個線程的機制)可以大大地解決這個問題。我正在穩步增加併發限制和實例數量,並且已經超過100萬個小時。 (實際的代碼生成與奇數一個4MB〜隨機長度數據16-32K - 4個實例10個併發線程)

using System; 
using System.Net; 
using System.Threading; 
using Microsoft.WindowsAzure; 
using Microsoft.WindowsAzure.ServiceRuntime; 
using Microsoft.WindowsAzure.StorageClient; 
using System.Threading.Tasks; 

namespace ThreadedWriterTest 
{ 
    public class WorkerRole : RoleEntryPoint 
    { 
     private static CloudStorageAccount storageAccount; 
     private static Semaphore semaphore = new Semaphore(3, 3); 

     public override void Run() 
     { 
      while (true) 
      { 
       semaphore.WaitOne(); 
       Task.Factory.StartNew(()=> writeStuff()); 
      } 
     } 

     private void writeStuff() 
     { 
      CloudBlobClient threadClient = storageAccount.CreateCloudBlobClient(); 
      threadClient.GetBlobReference("threadtest/" + Guid.NewGuid().ToString()).UploadText("Hello " + Guid.NewGuid().ToString()); 
      semaphore.Release(); 
     } 



     public override bool OnStart() 
     { 
      ServicePointManager.DefaultConnectionLimit = 12; 
      storageAccount = CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("XXX")); 
      return base.OnStart(); 
     } 
    } 
}