2013-02-15 56 views
6

我已經實現了一個簡單函數的正常和並行版本,該函數根據32bppArgb位圖計算直方圖。正常版本在1920x1080圖像上大約需要0.03秒,而並行版本則需要0.07秒。並行化直方圖函數

線程開銷真的很重嗎?除了Parallel.For還有其他一些構造可以加速這個過程嗎?自從我使用30fps視頻以來,我需要加快速度。

下面是簡化的代碼:

public sealed class Histogram 
{ 
    public int MaxA = 0; 
    public int MaxR = 0; 
    public int MaxG = 0; 
    public int MaxB = 0; 
    public int MaxT = 0; 

    public int [] A = null; 
    public int [] R = null; 
    public int [] G = null; 
    public int [] B = null; 

    public Histogram() 
    { 
     this.A = new int [256]; 
     this.R = new int [256]; 
     this.G = new int [256]; 
     this.B = new int [256]; 

     this.Initialize(); 
    } 

    public void Initialize() 
    { 
     this.MaxA = 0; 
     this.MaxR = 0; 
     this.MaxG = 0; 
     this.MaxB = 0; 
     this.MaxT = 0; 

     for (int i = 0; i < this.A.Length; i++) 
      this.A [i] = 0; 
     for (int i = 0; i < this.R.Length; i++) 
      this.R [i] = 0; 
     for (int i = 0; i < this.G.Length; i++) 
      this.G [i] = 0; 
     for (int i = 0; i < this.B.Length; i++) 
      this.B [i] = 0; 
    } 

    public void ComputeHistogram (System.Drawing.Bitmap bitmap, bool parallel = false) 
    { 
     System.Drawing.Imaging.BitmapData data = null; 

     data = bitmap.LockBits 
     (
      new System.Drawing.Rectangle(0, 0, bitmap.Width, bitmap.Height), 
      System.Drawing.Imaging.ImageLockMode.ReadOnly, 
      System.Drawing.Imaging.PixelFormat.Format32bppArgb 
     ); 

     try 
     { 
      ComputeHistogram(data, parallel); 
     } 
     catch 
     { 
      bitmap.UnlockBits(data); 

      throw; 
     } 

     bitmap.UnlockBits(data); 
    } 

    public void ComputeHistogram (System.Drawing.Imaging.BitmapData data, bool parallel = false) 
    { 
     int stride = System.Math.Abs(data.Stride); 

     this.Initialize(); 

     if (parallel) 
     { 
      unsafe 
      { 
       System.Threading.Tasks.Parallel.For 
       (
        0, 
        data.Height, 
        new System.Threading.Tasks.ParallelOptions() { MaxDegreeOfParallelism = System.Environment.ProcessorCount }, 
        y => 
        { 
         byte* pointer = ((byte*) data.Scan0) + (stride * y); 

         for (int x = 0; x < stride; x += 4) 
         { 
          this.B [pointer [x + 0]]++; 
          this.G [pointer [x + 1]]++; 
          this.R [pointer [x + 2]]++; 
          this.A [pointer [x + 3]]++; 
         } 
        } 
       ); 
      } 
     } 
     else 
     { 
      unsafe 
      { 
       for (int y = 0; y < data.Height; y++) 
       { 
        byte* pointer = ((byte*) data.Scan0) + (stride * y); 

        for (int x = 0; x < stride; x += 4) 
        { 
         this.B [pointer [x + 0]]++; 
         this.G [pointer [x + 1]]++; 
         this.R [pointer [x + 2]]++; 
         this.A [pointer [x + 3]]++; 
        } 
       } 
      } 
     } 

     for (int i = 0; i < this.A.Length; i++) 
      if (this.MaxA < this.A [i]) this.MaxA = this.A [i]; 
     for (int i = 0; i < this.R.Length; i++) 
      if (this.MaxR < this.R [i]) this.MaxR = this.R [i]; 
     for (int i = 0; i < this.G.Length; i++) 
      if (this.MaxG < this.G [i]) this.MaxG = this.G [i]; 
     for (int i = 0; i < this.B.Length; i++) 
      if (this.MaxB < this.B [i]) this.MaxB = this.B [i]; 

     if (this.MaxT < this.MaxA) this.MaxT = this.MaxA; 
     if (this.MaxT < this.MaxR) this.MaxT = this.MaxR; 
     if (this.MaxT < this.MaxG) this.MaxT = this.MaxG; 
     if (this.MaxT < this.MaxB) this.MaxT = this.MaxB; 
    } 
} 
+2

您是否嘗試讓每個線程計算多於1行?可能使他們處理10-20可能會加快一點。 – 2013-02-15 16:05:25

+0

那麼我已經分組了一個循環,運行1920次,並有四條語句。不知道如何構建它。有什麼建議麼? – 2013-02-15 16:07:50

+1

對於傳入「Parallel.For」的lambda,嘗試從'y'循環到'y' +(您必須找到某個最佳數字)。當然,這意味着將'parallel.For'的第二個參數從'data.Height'調整到其他的參數。 – 2013-02-15 16:10:13

回答

8

嗯,首先,你已經有了一個巨大的錯誤在你的並行循環:

你將有多個線程訪問,增加和更新共享陣列 - 只是運行示例代碼在同由於內在的競態條件,多次圖像會導致截然不同的結果。

但這不是你問的。

至於你爲什麼看到使用並行實現的性能下降,簡單的答案是你可能沒有在每個並行任務的主體中做足夠的工作來抵消創建新任務的「啓動成本」 ,安排它等等。

也許更關鍵的是,我相信你在內存中跳來跳去L1/L2緩存 - 每個任務線程都會嘗試加載它認爲的內容將需要進入緩存內存,但隨着索引到處都在建立,您不再創建一致的訪問模式,因此您每次嘗試訪問位圖緩衝區或內部數組時都可能會發生緩存未命中。

還有越來越善於位圖的只讀數據,而無需使用不安全的代碼的同樣性能方法......實際上,我們做的第一:

所以,你必須通過調用LockBits,一個指向非託管內存。讓我們把它的一個副本:

System.Drawing.Imaging.BitmapData data = null; 
data = bitmap.LockBits 
(
    new System.Drawing.Rectangle(0, 0, bitmap.Width, bitmap.Height), 
    System.Drawing.Imaging.ImageLockMode.ReadOnly, 
    System.Drawing.Imaging.PixelFormat.Format32bppArgb 
); 

// For later usage 
var imageStride = data.Stride; 
var imageHeight = data.Height; 

// allocate space to hold the data 
byte[] buffer = new byte[data.Stride * data.Height]; 

// Source will be the bitmap scan data 
IntPtr pointer = data.Scan0; 

// the CLR marshalling system knows how to move blocks of bytes around, FAST. 
Marshal.Copy(pointer, buffer, 0, buffer.Length); 

// and now we can unlock this since we don't need it anymore 
bitmap.UnlockBits(data); 

ComputeHistogram(buffer, imageStride, imageHeight, parallel); 

現在,作爲競爭條件 - 你可以在一個合理的高性能方式使用Interlocked電話顛簸起來計數(注意克服這個!多線程編程是辛苦了,這是完全有可能我的解決方案在這裏就不完美!

public void ComputeHistogram (byte[] data, int stride, int height, bool parallel = false) 
{ 
    this.Initialize(); 

    if (parallel) 
    { 
     System.Threading.Tasks.Parallel.For 
     (
      0, 
      height, 
      new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount }, 
      y => 
      { 
       int startIndex = (stride * y); 
       int endIndex = stride * (y+1); 
       for (int x = startIndex; x < endIndex; x += 4) 
       { 
        // Interlocked actions are more-or-less atomic 
        // (caveats abound, but this should work for us) 
        Interlocked.Increment(ref this.B[data[x]]); 
        Interlocked.Increment(ref this.G[data[x+1]]); 
        Interlocked.Increment(ref this.R[data[x+2]]); 
        Interlocked.Increment(ref this.A[data[x+3]]); 
       } 
      } 
     ); 
    } 
    else 
    { 
     // the original way is ok for non-parallel, since only one 
     // thread is mucking around with the data 
    } 

    // Sorry, couldn't help myself, this just looked "cleaner" to me 
    this.MaxA = this.A.Max(); 
    this.MaxR = this.R.Max(); 
    this.MaxG = this.G.Max(); 
    this.MaxB = this.B.Max(); 
    this.MaxT = new[] { this.MaxA, this.MaxB, this.MaxG, this.MaxR }.Max(); 
} 

那麼,這是什麼做的運行時行爲?

不是很多,但至少現在平行叉計算出正確的結果。:)

使用真正的小氣鬼試驗檯:

void Main() 
{  
    foreach(var useParallel in new[]{false, true}) 
    { 
     var totalRunTime = TimeSpan.Zero; 
     var sw = new Stopwatch(); 
     var runCount = 10; 
     for(int run=0; run < runCount; run++) 
     { 
      GC.Collect(); 
      GC.WaitForPendingFinalizers(); 
      GC.Collect(); 
      sw.Reset(); 
      sw.Start(); 
      var bmp = Bitmap.FromFile(@"c:\temp\banner.bmp") as Bitmap; 
      var hist = new Histogram(); 
      hist.ComputeHistogram(bmp, useParallel); 
      sw.Stop(); 
      totalRunTime = totalRunTime.Add(sw.Elapsed); 
     } 
     Console.WriteLine("Parallel={0}, Avg={1} ms", useParallel, totalRunTime.TotalMilliseconds/runCount); 
    } 
} 

我得到的結果是這樣的:

Parallel=False, Avg=1.69777 ms 
Parallel=True, Avg=5.33584 ms 

正如你所看到的,我們仍然沒有解決您原來的問題。 :)

那麼讓我們來刺傷使得並行工作「越多越好」:

讓我們來看看什麼是「讓更多的工作」的任務呢:

if (parallel) 
{ 
    var batchSize = 2; 
    System.Threading.Tasks.Parallel.For 
    (
     0, 
     height/batchSize, 
     new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount }, 
     y => 
     { 
      int startIndex = (stride * y * batchSize); 
      int endIndex = startIndex + (stride * batchSize); 
      for (int x = startIndex; x < endIndex; x += 4) 
      { 
       // Interlocked actions are more-or-less atomic 
       // (caveats abound, but this should work for us) 
       Interlocked.Increment(ref this.B[data[x]]); 
       Interlocked.Increment(ref this.G[data[x+1]]); 
       Interlocked.Increment(ref this.R[data[x+2]]); 
       Interlocked.Increment(ref this.A[data[x+3]]); 
      } 
     } 
    ); 
} 

結果:

Parallel=False, Avg=1.70273 ms 
Parallel=True, Avg=4.82591 ms 

哦,看起來很有前途......我不知道會發生什麼,因爲我們變化了batchSize

我們正是如此改變我們的試驗檯:

void Main() 
{  
    foreach(var useParallel in new[]{false, true}) 
    { 
     for(int batchSize = 1; batchSize < 1024; batchSize <<= 1) 
     { 
      var totalRunTime = TimeSpan.Zero; 
      var sw = new Stopwatch(); 
      var runCount = 10; 
      for(int run=0; run < runCount; run++) 
      { 
       GC.Collect(); 
       GC.WaitForPendingFinalizers(); 
       GC.Collect(); 
       sw.Reset(); 
       sw.Start(); 
       var bmp = Bitmap.FromFile(@"c:\temp\banner.bmp") as Bitmap; 
       var hist = new Histogram(); 
       hist.ComputeHistogram(bmp, useParallel, batchSize); 
       sw.Stop(); 
       totalRunTime = totalRunTime.Add(sw.Elapsed); 
      } 
      Console.WriteLine("Parallel={0}, BatchSize={1} Avg={2} ms", useParallel, batchSize, totalRunTime.TotalMilliseconds/runCount); 
     }   
    } 
} 

結果:(只顯示並行=正確的,因爲非平行不會改變)

Parallel=True, BatchSize=1 Avg=5.57644 ms 
Parallel=True, BatchSize=2 Avg=5.49982 ms 
Parallel=True, BatchSize=4 Avg=5.20434 ms 
Parallel=True, BatchSize=8 Avg=5.1721 ms 
Parallel=True, BatchSize=16 Avg=5.00405 ms 
Parallel=True, BatchSize=32 Avg=4.44973 ms 
Parallel=True, BatchSize=64 Avg=2.28332 ms 
Parallel=True, BatchSize=128 Avg=1.39957 ms 
Parallel=True, BatchSize=256 Avg=1.29156 ms 
Parallel=True, BatchSize=512 Avg=1.28656 ms 

我們似乎接近漸近一旦我們在批量大小上達到了64-128的範圍,雖然當然您的里程可能取決於您的位圖尺寸等。

我希望這有助於!從等待生產完成的那一天起,這是一個有趣的分心! :)

+0

謝謝!像這樣的答案具有傳染性,並鼓勵SO'ers回答更多問題。布拉沃。 – 2013-02-16 01:48:27

+0

關於memcopy,我假設你這樣做只是爲了避免不安全的代碼是正確的? – 2013-02-16 02:21:36

+0

我不知道是否有一種方法可以根據圖像大小以編程方式計算最佳批量大小。當然,你可以使用啓發式方法,但是不能很好地適用於不同的機器。或者使用類似於您的測試裝置在另一個線程中進行運行時調整。 – 2013-02-16 02:24:42

1

創建線程有相當顯著的開銷。執行的運行速度可能比單線程版本快得多,但完成速度太快以至於無法彌補此初始開銷。

如果你這樣做每一幀,它只會減慢你的速度。但是,如果手動創建線程池,手動分配工作,併爲每個框架重新使用線程,則可能會發現通過第二幀或第三幀代碼火箭通過單線程版本。