2011-01-27 39 views

回答

1

TBB已經包括一種方法(平行快速排序),其是-however使用英特爾螺紋積木 - 實現相當糟糕(運行時至少是線性的,與處理器的數量無關)。

我的建議是你從現有的實現中進行端口並行合併排序。 例如,使用OpenMP的gnu並行模式排序(包含在任何最近的gcc與源文件中)。 只需用一些tbb並行代碼替換所有#pragma omp即可。

2

首先,讓我說,根據我的經驗,tbb :: parallel_sort()相當有效,比我要發佈的代碼快一點(至少對於數千個元素我已經測試過)。

話雖如此,我認爲下面的代碼正是你正在尋找。變量應該是自我解釋和文檔中的代碼應該解釋其餘的 -

這將需要進行並行化:

#include<tbb/parallel_invoke.h> 

如果您選擇使用併發:: parallel_invoke(),它可以工作得更快,那麼包括此:

#include<ppl.h> 

我建議這些設置 -

#define MIN_ELEMENTS_FOR_RECURSION   (50) 
#define MIN_ELEMENTS_FOR_PARALLEL_PROCESSING (100) 

以下是調用的主要功能。參數是迭代器來啓動和隨機存取類的結束(例如,向量,雙端隊列等)和比較功能 -

template <typename T_it, typename T_it_dereferenced> 
void parallelMergeSort(T_it first, T_it last, bool (*firstLessThanSecond)(const T_it_dereferenced& a, const T_it_dereferenced& b)) 
{ 
    // create copy of container for extra space 
    std::vector<T_it_dereferenced> copy(first, last); 

    parallelMergeSortRecursive(first, last, copy.begin(), copy.end(), firstLessThanSecond); 
} 

這是遞歸從parallelMergeSort()爲了所謂的各佔一半排序 -

template <typename T_it, typename T_it_dereferenced> 
void parallelMergeSortRecursive(T_it source_first, T_it source_last, T_it copy_first, T_it copy_last, 
bool (*firstLessThanSecond)(const T_it_dereferenced& a, const T_it_dereferenced& b), int recursion_depth = 0) 
{ 
    // divide the array in two, and sort the two halves 

    long num_elements = source_last - source_first; 

    if (num_elements > MIN_ELEMENTS_FOR_RECURSION) 
    { 
     T_it source_middle = source_first + num_elements/2; 
     T_it copy_middle = copy_first + num_elements/2; 

     if (num_elements > MIN_ELEMENTS_FOR_PARALLEL_PROCESSING) 
     { 
      // Concurrency::parallel_invoke() may work faster 
      tbb::parallel_invoke(
       [=] { parallelMergeSortRecursive(source_first,  source_middle, copy_first, copy_middle, firstLessThanSecond, recursion_depth + 1); }, 
       [=] { parallelMergeSortRecursive(source_middle, source_last, copy_middle, copy_last,  firstLessThanSecond, recursion_depth + 1); } 
      ); 
     } 
     else // sort serially rather than in parallel 
     { 
      parallelMergeSortRecursive(source_first, source_middle, copy_first, copy_middle, firstLessThanSecond, recursion_depth + 1); 
      parallelMergeSortRecursive(source_middle, source_last, copy_middle, copy_last,  firstLessThanSecond, recursion_depth + 1); 
     } 

     // merge the two sorted halves 

     // we switch source <--> target with each level of recursion. 
     // at even recursion depths (including zero which is the root level) we assume the source is sorted and merge into the target 

     if (recursion_depth % 2 == 0) 
     { 
      merge(source_first, copy_first, copy_middle, copy_last, firstLessThanSecond); 
     } 
     else 
     { 
      merge(copy_first, source_first, source_middle, source_last, firstLessThanSecond); 
     } 
    } 
    else // very few elements remain to be sorted, stop the recursion and sort in place 
    { 
     if (recursion_depth % 2 == 0) 
     { 
      std::stable_sort(source_first, source_last, firstLessThanSecond); 
     } 
     else 
     { 
      std::stable_sort(copy_first, copy_last, firstLessThanSecond); 
     } 
    } 
} 

這是從遞歸函數調用,以合併兩半 -

template <typename T_it, typename T_it_dereferenced> 
void merge(T_it target_first, T_it source_first, T_it source_middle, T_it source_last, 
bool (*firstLessThanSecond)(const T_it_dereferenced& a, const T_it_dereferenced& b)) 
{ 
    // source is assumed to contain two sorted sequences (from first to middle and from middle to last) 

    T_it source_it1 = source_first; 
    T_it source_it2 = source_middle; 
    T_it target_it = target_first; 

    for (/* intentional */ ; source_it1 < source_middle && source_it2 < source_last ; ++target_it) 
    { 
     //if (source_container[i] < source_container[j]) 
     if ( firstLessThanSecond(*source_it1, *source_it2) ) 
     { 
      *target_it = *source_it1; 
      ++source_it1; 
     } 
     else 
     { 
      *target_it = *source_it2; 
      ++source_it2; 
     } 
    } 

    // insert remaining elements in non-completely-traversed-half into original container 
    // only one of these two whiles will execute since one of the conditions caused the previous while to stop 

    for (/* intentional */ ; source_it1 < source_middle ; ++target_it) 
    { 
     *target_it = *source_it1; 
     ++source_it1; 
    } 

    for (/* intentional */ ; source_it2 < source_last ; ++target_it) 
    { 
     *target_it = *source_it2; 
     ++source_it2; 
    } 
}