我會同意答案(現在刪除)和@harrism評論。由於我對非推力方法付出了一些努力,因此我將介紹我的發現。
我試圖天真地使用__shfl()
在warp層次上實現二分搜索,然後在整個數據集上重複該二分搜索,將數據集通過每個32元素組。
這很令人尷尬,但我的代碼比推力要慢20倍左右(實際上如果你用nvprof
仔細計時,可能會更糟)。
由於問題中的數據大小非常小,以至於計時不準確,因此我將數據大小稍微大於問題中提出的數據大小。
這裏有2種方式完全樣例:
中有什麼問題大致概括,即使用經洗牌創建一個二進制搜索,最多可以搜索到32元對32元有序陣列。重複這個過程爲32個元素的有序數組,然後通過每個有序數組傳遞整個數據集(希望你現在可以開始看到一些低效率。)
使用推力,基本上與@harrism概述了什麼,即對分組數據集進行排序,然後在其上運行矢量化thrust::binary_search。
這裏的例子:
$ cat t1030.cu
#include <stdio.h>
#include <assert.h>
#include <thrust/host_vector.h>
#include <thrust/device_vector.h>
#include <thrust/sort.h>
#include <thrust/binary_search.h>
typedef long mytype;
const int gsize = 32;
const int nGRP = 512;
const int dsize = nGRP*gsize;//gsize*nGRP;
#include <time.h>
#include <sys/time.h>
#define USECPSEC 1000000ULL
unsigned long long dtime_usec(unsigned long long start){
timeval tv;
gettimeofday(&tv, 0);
return ((tv.tv_sec*USECPSEC)+tv.tv_usec)-start;
}
template <typename T>
__device__ T my_shfl32(T val, unsigned lane){
return __shfl(val, lane);
}
template <typename T>
__device__ T my_shfl64(T val, unsigned lane){
T retval = val;
int2 t1 = *(reinterpret_cast<int2 *>(&retval));
t1.x = __shfl(t1.x, lane);
t1.y = __shfl(t1.y, lane);
retval = *(reinterpret_cast<T *>(&t1));
return retval;
}
template <typename T>
__device__ bool bsearch_shfl(T grp_val, T my_val){
int src_lane = gsize>>1;
bool return_val = false;
T test_val;
int shift = gsize>>2;
for (int i = 0; i <= gsize>>3; i++){
if (sizeof(T)==4){
test_val = my_shfl32(grp_val, src_lane);}
else if (sizeof(T)==8){
test_val = my_shfl64(grp_val, src_lane);}
else assert(0);
if (test_val == my_val) return_val = true;
src_lane += (((test_val<my_val)*2)-1)*shift;
shift>>=1;
assert ((src_lane < gsize)&&(src_lane > 0));}
if (sizeof(T)==4){
test_val = my_shfl32(grp_val, 0);}
else if (sizeof(T)==8){
test_val = my_shfl64(grp_val, 0);}
else assert(0);
if (test_val == my_val) return_val = true;
return return_val;
}
template <typename T>
__global__ void bsearch_grp(const T * __restrict__ search_grps, T *data){
int idx = threadIdx.x+blockDim.x*blockIdx.x;
int tid = threadIdx.x;
if (idx < gsize*nGRP){
T grp_val = search_grps[idx];
while (tid < dsize){
T my_val = data[tid];
if (bsearch_shfl(grp_val, my_val)) data[tid] = -1;
tid += blockDim.x;}
}
}
int main(){
// data setup
assert(gsize == 32); //mandatory (warp size)
assert((dsize % 32)==0); //needed to preserve shfl capability
thrust::host_vector<mytype> grps(gsize*nGRP);
thrust::host_vector<mytype> data(dsize);
thrust::host_vector<mytype> result(dsize);
for (int i = 0; i < gsize*nGRP; i++) grps[i] = i;
for (int i = 0; i < dsize; i++) data[i] = i;
// method 1: individual shfl-based binary searches on each group
mytype *d_grps, *d_data;
cudaMalloc(&d_grps, gsize*nGRP*sizeof(mytype));
cudaMalloc(&d_data, dsize*sizeof(mytype));
cudaMemcpy(d_grps, &(grps[0]), gsize*nGRP*sizeof(mytype), cudaMemcpyHostToDevice);
cudaMemcpy(d_data, &(data[0]), dsize*sizeof(mytype), cudaMemcpyHostToDevice);
unsigned long long my_time = dtime_usec(0);
bsearch_grp<<<nGRP, gsize>>>(d_grps, d_data);
cudaDeviceSynchronize();
my_time = dtime_usec(my_time);
cudaMemcpy(&(result[0]), d_data, dsize*sizeof(mytype), cudaMemcpyDeviceToHost);
for (int i = 0; i < dsize; i++) if (result[i] != -1) {printf("method 1 mismatch at %d, was %d, should be -1\n", i, (int)(result[i])); return 1;}
printf("method 1 time: %fs\n", my_time/(float)USECPSEC);
// method 2: thrust sort, followed by thrust binary search
thrust::device_vector<mytype> t_grps = grps;
thrust::device_vector<mytype> t_data = data;
thrust::device_vector<bool> t_rslt(t_data.size());
my_time = dtime_usec(0);
thrust::sort(t_grps.begin(), t_grps.end());
thrust::binary_search(t_grps.begin(), t_grps.end(), t_data.begin(), t_data.end(), t_rslt.begin());
cudaDeviceSynchronize();
my_time = dtime_usec(my_time);
thrust::host_vector<bool> rslt = t_rslt;
for (int i = 0; i < dsize; i++) if (rslt[i] != true) {printf("method 2 mismatch at %d, was %d, should be 1\n", i, (int)(rslt[i])); return 1;}
printf("method 2 time: %fs\n", my_time/(float)USECPSEC);
// method 3: multiple thrust merges, followed by thrust binary search
return 0;
}
$ nvcc -O3 -arch=sm_35 t1030.cu -o t1030
$ ./t1030
method 1 time: 0.009075s
method 2 time: 0.000516s
$
我是在Linux上運行此,CUDA 7.5,GT640 GPU。顯然,不同GPU上的性能會有所不同,但如果任何GPU顯着縮小差距,我會感到驚訝。
總之,你最好建議使用一個調整好的庫,如推力或幼崽。如果你不喜歡推力的單一性,你可以嘗試幼崽。我不知道Cub是否有二進制搜索,但是對整個分類數據集的單個二分搜索是not a difficult thing to write,這是涉及的時間的較小部分(對於方法2 - 使用nvprof
或附加的時間碼可識別)。
由於你的32個元素的分組範圍已經排序,我還考慮了使用多個thrust::merge
操作而不是單一排序的想法。我不確定哪個會更快,但由於推力方法已經比32元素洗牌搜索方法快得多,我認爲推力(或幼仔)是明顯的選擇。
「我不想使用推力」有什麼理由嗎? –
有很多原因,性能就是其中之一。這是一段較大代碼的子步驟。數組'inputValues'是以非常平凡的方式生成的。你可以說'inputValues'的已排序的32個元素塊不是開始連續的。我不希望將它們複製到連續的內存塊中,而是將它們保存在寄存器或共享內存中。從分離的全局內存到連續內存的收集對我來說似乎不是一個好主意。 – aatish
@ m.s。在這種情況下,您認爲哪種推力功能適合我的需求? – aatish