2013-07-24 68 views
5

在我的代碼中,我必須處理webmasocket數據包的「unmasking」,這實質上意味着XOR'ing任意長度的未對齊數據。感謝SO(Websocket data unmasking/multi byte xor),我已經發現瞭如何(希望)使用SSE2/AVX2擴展來加快速度,但現在看,我認爲我對未對齊數據的處理是完全不理想的。有沒有什麼辦法來優化我的代碼,或者至少讓它更簡單一些,或者我的代碼已經是最好的了?優化未對齊的SSE2/AVX2 XOR

下面是代碼的重要部分(對於這個問題,我假設數據將永遠足以運行一次AVX2循環,但同時它最多隻能運行幾次) :

// circular shift left for uint32 
int cshiftl_u32(uint32_t num, uint8_t shift) { 
    return (num << shift) | (num >> (32 - shift));                  
}                              

// circular shift right for uint32 
int cshiftr_u32(uint32_t num, uint8_t shift) { 
    return (num >> shift) | (num << (32 - shift));                  
}                              

void optimized_xor_32(uint32_t mask, uint8_t *ds, uint8_t *de) { 
    if (ds == de) return; // zero data len -> nothing to do 

    uint8_t maskOffset = 0; 

// process single bytes till 4 byte alignment (<= 3) 
    for (; ds < de && ((uint64_t)ds & (uint64_t)3); ds++) { 
     *ds ^= *((uint8_t *)(&mask) + maskOffset); 
     maskOffset = (maskOffset + 1) & (uint8_t)3; 
    } 

    if (ds == de) return; // done, return 

    if (maskOffset != 0) { // circular left-shift mask around so it works for other instructions 
     mask = cshiftl_u32(mask, maskOffset); 

     maskOffset = 0; 
    } 

// process 4 byte block till 8 byte alignment (<= 1) 
    uint8_t *de32 = (uint8_t *)((uint64_t)de & ~((uint64_t)31)); 

    if (ds < de32 && ((uint64_t)de & (uint64_t)7)) { 
     *(uint32_t *)ds ^= mask; // mask is uint32_t 

     if (++ds == de) return; 
    } 

// process 8 byte block till 16 byte alignment (<= 1) 
    uint64_t mask64 = mask | (mask << 4); 
    uint8_t *de64 = (uint8_t *)((uint64_t)de & ~((uint64_t)63)); 

    if (ds < de64 && ((uint64_t)ds & (uint64_t)15)) { 
     *(uint64_t *)ds ^= mask64; 

     if (++ds == de) return; // done, return 
    } 


// process 16 byte block till 32 byte alignment (<= 1) (if supported) 
#ifdef CPU_SSE2 
    __m128i v128, v128_mask; 
    v128_mask = _mm_set1_epi32(mask); 

    uint8_t *de128 = (uint8_t *)((uint64_t)de & ~((uint64_t)127)); 

    if (ds < de128 && ((uint64_t)ds & (uint64_t)31)) { 
     v128 = _mm_load_si128((__m128i *)ds); 
     v128 = _mm_xor_si128(v128, v128_mask); 
     _mm_store_si128((__m128i *)ds, v128); 

     if (++ds == de) return; // done, return 
    } 

#endif 
#ifdef CPU_AVX2 // process 32 byte blocks (if supported -> haswell upwards) 
    __m256i v256, v256_mask; 
    v256_mask = _mm256_set1_epi32(mask); 

    uint8_t *de256 = (uint8_t *)((uint64_t)de & ~((uint64_t)255)); 

    for (; ds < de256; ds+=32) { 
     v256 = _mm256_load_si256((__m256i *)ds); 
     v256 = _mm256_xor_si256(v256, v256_mask); 
     _mm256_store_si256((__m256i *)ds, v256); 
    } 

    if (ds == de) return; // done, return 
#endif 
#ifdef CPU_SSE2 // process remaining 16 byte blocks (if supported) 
    for (; ds < de128; ds+=16) { 
     v128 = _mm_load_si128((__m128i *)ds); 
     v128 = _mm_xor_si128(v128, v128_mask); 
     _mm_store_si128((__m128i *)ds, v128); 
    } 

    if (ds == de) return; // done, return 

#endif 
    // process remaining 8 byte blocks 
    // this should always be supported, so remaining can be assumed to be executed <= 1 times 
    for (; ds < de64; ds += 8) { 
     *(uint64_t *)ds ^= mask64; 
    } 

    if (ds == de) return; // done, return 

    // process remaining 4 byte blocks (<= 1) 
    if (ds < de32) { 
     *(uint32_t *)ds ^= mask; 

     if (++ds == de) return; // done, return 
    } 


    // process remaining bytes (<= 3) 

    for (; ds < de; ds ++) { 
     *ds ^= *((uint8_t *)(&mask) + maskOffset); 
     maskOffset = (maskOffset + 1) & (uint8_t)3; 
    } 

} 

PS:請忽略使用#ifdef而不是cpuid或cpuid等來檢測cpu標誌。

+0

您是否試過計時代碼? (另外,你可能想用圓括號將你的條件語句中的'&'換行) –

+1

定時不會真的有幫助,因爲我只能對輸入的數據進行假設,但不會得到任何實際的結果投入幾個月後。此外,我只會得到一些絕對數字,這對我來說並沒有什麼幫助,因爲我的問題沒有找到代碼用xy輸入執行需要多長時間,但是如何使它更快,例如,我不知道要改變什麼。 P.S .:裹住按鈕&爲了更容易理解,thx的提示! – griffin

+1

我認爲你會發現數據依賴性失速超過了對齊/未對齊的好處。如果您可以將循環展開2倍,則應該看到顯着的改進。 – BitBank

回答

2

與手冊中的說法不同,大多數英特爾處理器在處理未對齊的數據方面確實相當出色。由於您正在使用英特爾的編譯器內置函數進行向量處理,因此我認爲您可以訪問合理的最新版本icc

如果你不能自然地對齊你的數據,那麼我恐怕你所做的儘可能地接近最佳性能。在使Xeon Phi(64字節向量寄存器)/未來更長的向量處理器更易於讀取和部署的方面,我建議您開始使用Intel Cilk Plus

例子:

void intel_cilk_xor(uint32_t mask, uint8_t *d, size_t length) { 
    while (length & 0x3) { 
     *(d++) ^= mask; 
     asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left 
     length--; 
    } 

    // switch to 4 bytes per block 
    uint32_t _d = d; 
    length >>= 2; 

    // Intel Cilk Plus Array Notation 
    // Should expand automatically to the best possible SIMD instructions 
    // you are compiling for 
    _d[0:length] ^= mask; 
} 

請注意,我沒有測試這個代碼,因爲我沒有獲得英特爾編譯器現在。如果您遇到問題,那麼下週我回到辦公室時可以重新開始。

如果你更喜歡內部函數,然後正確使用預處理宏可以顯著簡化你的生活:

#if defined(__MIC__) 
// intel Xeon Phi 
#define VECTOR_BLOCKSIZE 64 
// I do not remember the correct types/instructions right now 
#error "TODO: MIC handling" 
#elif defined(CPU_AVX2) 
#define VECTOR_BLOCKSIZE 32 
typedef __m256i my_vector_t; 
#define VECTOR_LOAD_MASK _mm256_set1_epi32 
#define VECTOR_XOR(d, mask) _mm_store_si256(d, _mm256_set1_epi32(_mm256_load_si256(d), mask)) 
#elif defined(CPU_SSE2) 
#define VECTOR_BLOCKSIZE 16 
typedef __m128i my_vector_t; 
#define VECTOR_LOAD_MASK _mm128_set1_epi32 
#define VECTOR_XOR(d, mask) _mm_store_si128(d, _mm128_set1_epi32(_mm128_load_si128(d), mask)) 
#else 
#define VECTOR_BLOCKSIZE 8 
#define VECTOR_LOAD_MASK(mask) ((mask) << 32 | (mask)) 
#define VECTOR_XOR(d, mask) (*(d)) ^= (mask) 
typedef uint64_t my_vector_t; 
#fi 

void optimized_xor_32(uint32_t mask, uint8_t *d, size_t length) { 
    size_t i; 

    // there really is no point in having extra 
    // branches for different vector lengths if they are 
    // executed at most once 
    // branch prediction is your friend here 
    // so we do one byte at a time until the block size 
    // is reached 

    while (length && (d & (VECTOR_BLOCKSIZE - 1))) { 
     *(d++) ^= mask; 
     asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left 
     length--; 
    } 

    my_vector_t * d_vector = (my_vector_t *)d; 
    my_vector_t vector_mask = VECTOR_LOAD_MASK(mask); 

    size_t vector_legth = length/VECTOR_BLOCKSIZE; // compiler will optimise this to a bitshift 
    length &= VECTOR_BLOCKSIZE -1; // remaining length 

    for (i = 0; i < vector_legth; i++) { 
     VECTOR_XOR(d_vector + i, vector_mask); 
    } 

    // process the tail 
    d = (uint8_t*)(d_vector + i); 
    for (i = 0; i < length; i++) { 
     d[i] ^= mask; 
     asm ("rold $8, %0" : "+g" (mask) :: "cc"); 
    } 

} 

在另一方面:您可能需要使用的x86旋轉指令,而不是一位向旋轉mask

#define asm_rol(var, bits) asm ("rol %1, %0" : "+r" (var) : "c" ((uint8_t)bits) : "cc") 
+0

我沒有使用icc,除了gcc,我沒有任何特殊的icc訪問。雖然不知道旋轉指令,但必須查看它究竟做了什麼,thx! – griffin

+0

@griffin好的,我的印象是'_mm_load_si128'和family是'icc'內置的。在這種情況下,你應該拿我的第二個代碼片段,只是沒有MIC的部分。可悲的是沒有固有的旋轉指令,我知道,例如'htons'使用2字節旋轉。 –

+0

Upvoted,但是當我有時間時可能會嘗試,這可能不會那麼早發生,但是當我測試它的工作並且表現良好時,我將確保接受這一點。感謝您的期待! – griffin