該解決方案需要雙聲道排序的兩次通過,以計數重複以及刪除它們(以及將它們移動到陣列的末尾)。 Bitonic排序是O(log(n)^2)
,所以這將運行時間複雜度2(log(n)^2)
,這應該不會是一個問題,除非你在一個循環中運行它。
對於每個元件創建一個簡單的結構,以包括副本的數目,並且如果元件已被添加爲重複,這樣的:
// Note: If you are worried about space, or know that there
// will only be a few duplicates for each element, then
// make the count element smaller
typedef struct {
cl_ulong value;
cl_ulong count : 63;
cl_ulong seen : 1;
} Element;
算法:
您可以從創建一個比較函數開始,該函數會將重複項移動到最後,如果要將重複項添加到元素的總計數中,則對重複項進行計數。這是比較函數背後的邏輯:
- 如果一個元素重複,另一個是不,則返回該非重複的元素是小(不管值的),這將所有重複移動到結束。
- 如果元素重複且右元素未標記爲重複(
seen=0
),則將正確元素的計數添加到左元素的計數中,並將右元素設置爲重複(seen=1
)。這具有將具有特定值的元素的總計數移動到具有該值的數組中最左邊的元素,並且將具有該值的所有重複元素移動到最後。
- 否則返回值越小的元素越小。
比較函數將如下所示:
bool compare(const Element* E1, const Element* E2) {
if (!E1->seen && E2->seen) return true; // E1 smaller
if (!E2->seen && E1->seen) return false; // E2 smaller
// If the elements are duplicates and the right element has
// not yet been "seen" by an element with the same value
if (E1->value == E2->value && !E2->seen) {
E1->count += E2->count;
E2->seen = 1;
return true;
}
// They aren't duplicates, and either
// neither has been seen, or both have
return E1->value < E2->value;
}
雙調排序具有特定的結構,其可與一個圖來很好地示出。在該圖中,每個元素由三元組(a,b,c)
來引用,其中a = value
,b = count
和c = seen
。
每個圖表顯示陣列上的一次雙音排序(垂直線表示元素之間的比較,水平線移動到雙音排序的下一個階段)。使用圖表和上面的比較函數和邏輯,你應該能夠說服自己,這是做什麼需要的。
試驗1:
試驗2:
在運行2的端部,所有的元件由值設置。與seen = 1
在最後重複,並與seen = 0
重複在他們的正確位置和count
是具有相同值的其他元素的數量。
實現:
的圖是彩色編碼來說明雙調排序的子過程。我會將藍色塊稱爲一個階段(圖中每個運行階段都有三個階段)。一般來說,每次運行都會有ceil(log(N))
階段。每個階段都由一些綠色塊組成(我將這些塊稱爲out-in
塊,因爲比較的形狀已經到了)和紅色塊(我將這些塊稱爲constant
塊,因爲要比較的元素之間的距離依然存在不變)。
從圖中可以看出,out-in
塊大小(每個塊中的元素)從2開始,在每次通過時加倍。每個過程的塊大小從塊大小的out-in
的一半開始(在第二個(藍色塊)階段中,在四個紅色塊的每一箇中有兩個元素,因爲綠色塊的大小爲4),並且每個塊的大小爲一半相位內連續的紅色塊垂直線。另外,相位中的constant
(紅色)塊的連續垂直線的數量總是與具有0索引的相位數相同(相位0的紅色塊的0條垂直線,相位1的紅色塊的1條垂直線,和2階段的紅色塊的垂直線 - 每條垂直線是調用該內核的迭代)。
然後可以使內核爲out-in
通行證和constant
通行證,然後調用從主機側的內核(因爲你需要不斷的同步,這是不利的,但你仍然應該看到在連續大性能改進實現)。
從主機側,整體雙調排序可能是:
cl_uint num_elements = 4; // Set number of elements
cl_uint phases = (cl_uint)ceil((float)log2(num_elements));
cl_uint out_in_block_size = 2;
cl_uint constant_block_size;
// Set the elements_buffer, which should have been created with
// with clCreateBuffer, as the first kernel argument, and the
// number of elements as the second kernel argument
clSetKernelArg(out_in_kernel, 0, sizeof(cl_mem), (void*)(&elements_buffer));
clSetKernelArg(out_in_kernel, 1, sizeof(cl_uint), (void*)(&num_elements));
clSetKernelArg(constant_kernel, 0, sizeof(cl_mem), (void*)(&elements_buffer));
clSetKernelArg(constant_kernel, 1, sizeof(cl_uint), (void*)(&num_elements));
// For each pass
for (unsigned int phase = 0; phase < phases; ++phase) {
// -------------------- Green Part ------------------------ //
// Set the out_in_block size for the kernel
clSetKernelArg(out_in_kernel, 2, sizeof(cl_int), (void*)(&out_in_block_size));
// Call the kernel - command_queue is the clCommandQueue
// which should have been created during cl setup
clEnqueNDRangeKernel(command_queue , // clCommandQueue
out_in_kernel , // The kernel
1 , // Work dim = 1 since 1D array
NULL , // No global offset
&global_work_size,
&local_work_size ,
0 ,
NULL ,
NULL);
barrier(CLK_GLOBAL_MEM_FENCE); // Synchronise
// ---------------------- End Green Part -------------------- //
// Set the block size for constant blocks based on the out_in_block_size
constant_block_size = out_in_block_size/2;
// -------------------- Red Part ------------------------ //
for (unsigned int i 0; i < phase; ++i) {
// Set the constant_block_size as a kernel argument
clSetKernelArg(constant_kernel, 2, sizeof(cl_int), (void*)(&constant_block_size));
// Call the constant kernel
clEnqueNDRangeKernel(command_queue , // clCommandQueue
constant_kernel , // The kernel
1 , // Work dim = 1 since 1D array
NULL , // No global offset
&global_work_size,
&local_work_size ,
0 ,
NULL ,
NULL);
barrier(CLK_GLOBAL_MEM_FENCE); // Synchronise
// Update constant_block_size for next iteration
constant_block_size /= 2;
}
// ------------------- End Red Part ---------------------- //
}
然後內核會是這樣的(你也需要把結構的typedef在內核文件中,以便OpenCL編譯器知道什麼是「元素」是):
__global void out_in_kernel(__global Element* elements, unsigned int num_elements, unsigned int block_size) {
const unsigned int idx_upper = // index of upper element in diagram.
const unsigned int idx_lower = // index of lower element in diagram
// Check that both indices are in range (this depends on thread mapping)
if (idx_upper is in range && index_lower is in range) {
// Do the comparison
if (!compare(elements + idx_upper, elements + idx_lower) {
// Swap the elements
}
}
}
的constant_kernel看起來一樣,但是線程映射(你如何確定idx_upper
和idx_lower
)會有所不同。有很多方法可以將線程映射到通常用於模仿圖的元素(請注意,所需的線程數是元素總數的一半,因爲每個線程都可以進行一次比較)。
另一個考慮是如何使線程映射一般(如果你有一些不是2的冪的元素算法不會中斷)。
您是否在排序過程中刪除重複元素?你可以把每個ulong包裝在每個ulong增加1個字節的結構中嗎?或者,你還可以犧牲每個過剩的2 - 4位?如果其中任何一個都沒問題,我認爲我有一個O(log(n)^ 2)解決方案 - 就像是雙向排序一樣。 – RobClucas
刪除重複項:是。如示例中所示,我想只列出項目加上一個計數器的列表。哪個好。 。 。它也可以先計算然後排序。 在每個ulong中增加1個字節的結構中包含每個ulong:確實,我需要將計數器存儲在某個地方嗎?如果您願意分享您的解決方案,我將非常感激。 – Nicola
首先排序,即使重複,只需使用低存儲器算法。然後重複將被組合在一起,所以你可以對它們進行計數並對它們進行分組,並且我也不知道如何在OpenCL上快速執行此操作.... – DarkZeros