2014-01-16 68 views
5

我需要從ushort arrayB中相應長度的索引值中快速減去ushort arrayA中的每個值。如何從C#中快速減去另一個ushort數組?

另外,如果差值是負值,我需要存儲一個零值,而不是負值差值。

(長度= 327680是準確的,因爲我從另一幅相同尺寸的圖像中減去640x512圖像)。

下面的代碼目前正在使用〜20ms,如果可能的話,我想在〜5ms以內。不安全的代碼是可以的,但請提供一個例子,因爲我在編寫不安全的代碼方面沒有超強的技巧。

謝謝!

public ushort[] Buffer { get; set; } 

public void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer) 
{ 
    System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch(); 
    sw.Start(); 

    int bufferLength = Buffer.Length; 

    for (int index = 0; index < bufferLength; index++) 
    { 
     int difference = Buffer[index] - backgroundBuffer[index]; 

     if (difference >= 0) 
      Buffer[index] = (ushort)difference; 
     else 
      Buffer[index] = 0; 
    } 

    Debug.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2")); 
} 

UPDATE:雖然它不是嚴格的C#,別人誰讀這個的好處,我終於結束了加入了C++ CLR類庫我用下面的代碼的解決方案。它運行在〜3.1ms。如果使用非託管C++庫,它運行在〜2.2ms。由於時差很小,我決定去託管的圖書館。

// SpeedCode.h 
#pragma once 
using namespace System; 

namespace SpeedCode 
{ 
    public ref class SpeedClass 
    { 
     public: 
      static void SpeedSubtractBackgroundFromBuffer(array<UInt16>^buffer, array<UInt16>^backgroundBuffer, int bufferLength); 
    }; 
} 

// SpeedCode.cpp 
// This is the main DLL file. 
#include "stdafx.h" 
#include "SpeedCode.h" 

namespace SpeedCode 
{ 
    void SpeedClass::SpeedSubtractBackgroundFromBuffer(array<UInt16>^buffer, array<UInt16>^backgroundBuffer, int bufferLength) 
    { 
     for (int index = 0; index < bufferLength; index++) 
     { 
      buffer[index] = (UInt16)((buffer[index] - backgroundBuffer[index]) * (buffer[index] > backgroundBuffer[index])); 
     } 
    } 
} 

然後我把它稱爲是這樣的:

public void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer) 
    { 
     System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch(); 
     sw.Start(); 

     SpeedCode.SpeedClass.SpeedSubtractBackgroundFromBuffer(Buffer, backgroundBuffer, Buffer.Length); 

     Debug.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2")); 
    } 
+0

〜20ms的聲音很慢(也許你的機器是低規格?)。 *以防萬一*,您是否正在運行發佈版本而不進行調試? – Ergwun

+0

p /調用並使用PSUBW? – Yaur

+0

出於興趣,但你在灰度圖像上操作? –

回答

4

一些基準測試。

  1. SubtractBackgroundFromBuffer:這是從問題的原始方法。
  2. SubtractBackgroundFromBufferWithCalcOpt:這是TTat提高計算速度的原創方法。
  3. SubtractBackgroundFromBufferParallelFor:來自Selman22的答案的解決方案。
  4. SubtractBackgroundFromBufferBlockParallelFor:我的回答。與3.類似,但將處理分成4096個值。
  5. SubtractBackgroundFromBufferPartitionedParallelForEach:傑夫的第一個答案。
  6. SubtractBackgroundFromBufferPartitionedParallelForEachHack:傑夫的第二個答案。

更新

有趣的是,我可以通過使用(由布魯諾科斯塔所建議的)

Buffer[i] = (ushort)Math.Max(difference, 0); 

代替

得到一個小的速度增加(〜6%)爲 SubtractBackgroundFromBufferBlockParallelFor
if (difference >= 0) 
    Buffer[i] = (ushort)difference; 
else 
    Buffer[i] = 0; 

結果

請注意,這是每次運行1000次迭代的總時間。

SubtractBackgroundFromBuffer(ms):         2,062.23 
SubtractBackgroundFromBufferWithCalcOpt(ms):      2,245.42 
SubtractBackgroundFromBufferParallelFor(ms):      4,021.58 
SubtractBackgroundFromBufferBlockParallelFor(ms):     769.74 
SubtractBackgroundFromBufferPartitionedParallelForEach(ms):   827.48 
SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):  539.60 

所以從最好的方法結合了計算優化的一個小的收穫這些結果似乎和利用的Parallel.For對圖像的塊進行操作。您的里程當然會有所不同,並行代碼的性能對您正在運行的CPU非常敏感。

測試工具

我跑這在發行模式中的每個方法。我以這種方式開始並停止Stopwatch以確保只測量處理時間。

System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch(); 
ushort[] bgImg = GenerateRandomBuffer(327680, 818687447); 

for (int i = 0; i < 1000; i++) 
{ 
    Buffer = GenerateRandomBuffer(327680, 128011992);     

    sw.Start(); 
    SubtractBackgroundFromBuffer(bgImg); 
    sw.Stop(); 
} 

Console.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2")); 


public static ushort[] GenerateRandomBuffer(int size, int randomSeed) 
{ 
    ushort[] buffer = new ushort[327680]; 
    Random random = new Random(randomSeed); 

    for (int i = 0; i < size; i++) 
    { 
     buffer[i] = (ushort)random.Next(ushort.MinValue, ushort.MaxValue); 
    } 

    return buffer; 
} 

方法

public static void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer) 
{ 
    int bufferLength = Buffer.Length; 

    for (int index = 0; index < bufferLength; index++) 
    { 
     int difference = Buffer[index] - backgroundBuffer[index]; 

     if (difference >= 0) 
      Buffer[index] = (ushort)difference; 
     else 
      Buffer[index] = 0; 
    } 
} 

public static void SubtractBackgroundFromBufferWithCalcOpt(ushort[] backgroundBuffer) 
{ 
    int bufferLength = Buffer.Length; 

    for (int index = 0; index < bufferLength; index++) 
    { 
     if (Buffer[index] < backgroundBuffer[index]) 
     { 
      Buffer[index] = 0; 
     } 
     else 
     { 
      Buffer[index] -= backgroundBuffer[index]; 
     } 
    } 
} 

public static void SubtractBackgroundFromBufferParallelFor(ushort[] backgroundBuffer) 
{ 
    Parallel.For(0, Buffer.Length, (i) => 
    { 
     int difference = Buffer[i] - backgroundBuffer[i]; 
     if (difference >= 0) 
      Buffer[i] = (ushort)difference; 
     else 
      Buffer[i] = 0; 
    }); 
}   

public static void SubtractBackgroundFromBufferBlockParallelFor(ushort[] backgroundBuffer) 
{ 
    int blockSize = 4096; 

    Parallel.For(0, (int)Math.Ceiling(Buffer.Length/(double)blockSize), (j) => 
    { 
     for (int i = j * blockSize; i < (j + 1) * blockSize; i++) 
     { 
      int difference = Buffer[i] - backgroundBuffer[i]; 

      Buffer[i] = (ushort)Math.Max(difference, 0);      
     } 
    }); 
} 

public static void SubtractBackgroundFromBufferPartitionedParallelForEach(ushort[] backgroundBuffer) 
{ 
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range => 
     { 
      for (int i = range.Item1; i < range.Item2; ++i) 
      { 
       if (Buffer[i] < backgroundBuffer[i]) 
       { 
        Buffer[i] = 0; 
       } 
       else 
       { 
        Buffer[i] -= backgroundBuffer[i]; 
       } 
      } 
     }); 
} 

public static void SubtractBackgroundFromBufferPartitionedParallelForEachHack(ushort[] backgroundBuffer) 
{ 
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range => 
    { 
     for (int i = range.Item1; i < range.Item2; ++i) 
     { 
      unsafe 
      { 
       var nonNegative = Buffer[i] > backgroundBuffer[i]; 
       Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) * 
        *((int*)(&nonNegative))); 
      } 
     } 
    }); 
} 
+0

@BrunoCosta 1.我不確定我是否明白你的意思。 「再劃分」是什麼意思? 2.什麼讓你覺得這不是在整個陣列上運行? Blocksize是一個有點武斷的選擇,可能值得進一步的基準。 –

+0

我完全錯過了發現的代碼...但我還是相信,Parallel.Foreach會使它成爲一個自我分區。通過分區我的意思是可以分配許多線程來處理您的4096塊。但也許我錯了.. –

+0

@BrunoCosta別擔心,我們都做了幾次;) –

1

您可以嘗試Parallel.For

Parallel.For(0, Buffer.Length, (i) => 
{ 
    int difference = Buffer[i] - backgroundBuffer[i]; 
    if (difference >= 0) 
      Buffer[i] = (ushort) difference; 
    else 
     Buffer[i] = 0; 
}); 

更新:我已經嘗試過了,我看有你的情況最小差,但當陣列變大時差異變得更大

enter image description here

+0

@elgonzo'Parallel.For'不會爲每次迭代創建新任務:[是否Parallel.For每次迭代使用一個任務?](http://blogs.msdn.com/b/pfxteam/archive/2009/05/ 26/9641563.aspx?Redirected = true) – MarcinJuraszek

+0

對。哎呀,我的壞... – elgonzo

+0

這可能會節省幾個減和鑄造週期: if(Buffer [i] <= backgroundBuffer [i]){Buffer [i] = 0; } else {Buffer [i] - = backgroundBuffer [i]; } – TTat

1

在實際執行減法之前,您可能會首先檢查結果是否爲負值,從而可能會獲得較小的性能提升。這樣,如果結果爲負,則不需要執行減法。示例:

if (Buffer[index] > backgroundBuffer[index]) 
    Buffer[index] = (ushort)(Buffer[index] - backgroundBuffer[index]); 
else 
    Buffer[index] = 0; 
+0

這取決於抖動如何編譯代碼到程序集。即使節省了速度也不會超過幾微秒。 –

4

這是一個有趣的問題。

只有在測試結果不會爲負時執行減法(如TTat和Maximum Cookie所示)影響可以忽略不計,因爲此優化可能已由JIT編譯器執行。

並行化任務(如Selman22建議)是一個好主意,但是當環路一樣快,因爲它是在這種情況下,開銷最終outwaying,所以實際上Selman22's implementation在我的測試運行速度較慢的收益。我懷疑nick_w's benchmarks是用附加的調試器生成的,隱藏了這個事實。

並行化較大的塊(由nick_w建議)任務處理與開銷的問題,實際上可以產生更快的性能,但你不必自己計算塊 - 您可以使用Partitioner爲你做這個:

public static void SubtractBackgroundFromBufferPartitionedParallelForEach(
    ushort[] backgroundBuffer) 
{ 
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range => 
     { 
      for (int i = range.Item1; i < range.Item2; ++i) 
      { 
       if (Buffer[i] < backgroundBuffer[i]) 
       { 
        Buffer[i] = 0; 
       } 
       else 
       { 
        Buffer[i] -= backgroundBuffer[i]; 
       } 
      } 
     }); 
} 

上述方法的性能一直優於nick_w's手卷在我的測試組塊。

但是等等!除此之外還有更多。

放慢代碼的真正罪魁禍首不是賦值或算術。這是if聲明。它對性能的影響將主要受到您正在處理的數據的性質的影響。

nick_w's基準測試會爲兩個緩衝區生成相同大小的隨機數據。但是,我懷疑它很可能實際上在後臺緩衝區中具有較低的平均幅度數據。由於分支預測(如this classic SO answer中所述),此詳細信息可能很重要。

當後臺緩衝區中的值通常小於緩衝區中的值時,JIT編譯器會注意到這一點,並相應地對該分支進行優化。當每個緩衝區中的數據來自相同的隨機總體時,無法以超過50%的準確度猜測結果的if聲明。正是後一種情況下,nick_w是基準測試,在這些條件下,我們可以通過使用不安全的代碼將bool轉換爲整數並避免分支,進一步優化您的方法。 (請注意,下面的代碼依賴於bool在內存中的表示方式的實現細節,並且它適用於.NET 4.5中的場景,但這不一定是個好主意,並且在此處顯示用於說明目的。)

public static void SubtractBackgroundFromBufferPartitionedParallelForEachHack(
    ushort[] backgroundBuffer) 
{ 
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range => 
     { 
      for (int i = range.Item1; i < range.Item2; ++i) 
      { 
       unsafe 
       { 
        var nonNegative = Buffer[i] > backgroundBuffer[i]; 
        Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) * 
         *((int*)(&nonNegative))); 
       } 
      } 
     }); 
} 

如果你真的希望關閉刮鬍子多一點的時間,那麼你可以通過切換語言C++/CLI遵循一個更安全的方式這種做法,因爲這將讓你使用一個布爾值在算術表達式,而不訴諸不安全代碼:

UInt16 MyCppLib::Maths::SafeSubtraction(UInt16 minuend, UInt16 subtrahend) 
{ 
    return (UInt16)((minuend - subtrahend) * (minuend > subtrahend)); 
} 

您可以使用C++/CLI露出上述靜態方法純粹管理DLL,一個然後在你的C#代碼中使用它:

public static void SubtractBackgroundFromBufferPartitionedParallelForEachCpp(
    ushort[] backgroundBuffer) 
{ 
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range => 
    { 
     for (int i = range.Item1; i < range.Item2; ++i) 
     { 
      Buffer[i] = 
       MyCppLib.Maths.SafeSubtraction(Buffer[i], backgroundBuffer[i]); 
     } 
    }); 
} 

這超過了hacky不安全的C#代碼上面。實際上,它的速度非常快,您可以使用C++/CLI編寫整個方法來忘記並行化,並且它仍然會勝過其他技術。

使用nick_w's test harness,上述方法將勝過迄今發佈在此處的任何其他建議。下面是結果我得到(1-4是他嘗試的情況下,5-7是在這個答案中概述的):

1. SubtractBackgroundFromBuffer(ms):        2,021.37 
2. SubtractBackgroundFromBufferWithCalcOpt(ms):     2,125.80 
3. SubtractBackgroundFromBufferParallelFor(ms):     3,431.58 
4. SubtractBackgroundFromBufferBlockParallelFor(ms):    1,401.36 
5. SubtractBackgroundFromBufferPartitionedParallelForEach(ms):  1,197.76 
6. SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms): 742.72 
7. SubtractBackgroundFromBufferPartitionedParallelForEachCpp(ms): 499.27 

然而,在情景我希望你確實有,哪裏背景值通常較小,成功的分支預測提高全線的結果,和「黑客」,以避免if聲明實際上是慢:

這裏是我開始使用nick_w's test harness結果時,我在後臺緩衝限制值範圍0-6500(約10%緩衝區):

1. SubtractBackgroundFromBuffer(ms):         773.50 
2. SubtractBackgroundFromBufferWithCalcOpt(ms):      915.91 
3. SubtractBackgroundFromBufferParallelFor(ms):     2,458.36 
4. SubtractBackgroundFromBufferBlockParallelFor(ms):     663.76 
5. SubtractBackgroundFromBufferPartitionedParallelForEach(ms):  658.05 
6. SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms): 762.11 
7. SubtractBackgroundFromBufferPartitionedParallelForEachCpp(ms): 494.12 

您可以看到結果1-5顯着改善,因爲它們現在從更好的分支預測中受益。結果6 & 7沒有太大變化,因爲它們避免了分支。

數據的這種改變已經完全改變了事情。在這種情況下,即使是最快的所有C#解決方案現在只比原始代碼快15%。

底線:一定要測試你挑代表性的數據,或者您的結果將是沒有意義的任何方法。

+0

你正在將一個bool *轉換爲一個int *(無效),並且正在假設一個真正的bool的數值(無效 - 不保證是一個)。雖然我喜歡避免分支的一般想法。 – usr

+0

@usr是的,該代碼確實有效,但你說得對,依靠這個實現細節並不是一個好主意 - 我會澄清的。正如我在答案中所說的那樣,我懷疑這個黑客實際上會隨着OP的數據而變慢。 – Ergwun

+0

@GeoffBattye:感謝您的基準測試和精彩評論!我希望我也可以將您的答案標記爲解決方案!順便說一句,backgroundBuffer幾乎總是會有比Buffer小的值,所以希望JIT編譯器能夠正確地注意和優化,正如你所說的那樣。 – nb1forxp

0

這是一個使用Zip()一個解決方案:

Buffer = Buffer.Zip<ushort, ushort, ushort>(backgroundBuffer, (x, y) => 
{ 
    return (ushort)Math.Max(0, x - y); 
}).ToArray(); 

它不執行,以及其他的答案,但它肯定是最短的解決方案。

0

怎麼樣,

Enumerable.Range(0, Buffer.Length).AsParalell().ForAll(i => 
    { 
     unsafe 
     { 
      var nonNegative = Buffer[i] > backgroundBuffer[i]; 
      Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) * 
       *((int*)(&nonNegative))); 
     } 
    }); 
+0

這個出來比'Partitioner'的'Parallel.Foreach'大約慢10倍。令人驚訝的是,它遠遠落後。 – Ergwun

相關問題