2016-03-07 47 views
0

我想實現並行基數基數排序與基數256.似乎我函數srt在單線程中運行良好。但是,當有更多的線程用於隨機數據時,有時會出現錯誤:函數srt的「訪問衝突讀取位置」後跟「marker [index]」的地址。它在函數srt的第15行中斷行,即「tmp = marker [index]」,其索引值爲63.任何一個解釋發生了什麼?C++並行到位基數排序

inline 
void count(unsigned* list, int size, int* histogram) { 
    for (int i = 0; i < size; ++i) { 
     ++histogram[(list[i]>>24)]; 
    } 
} 
void srt(int* histogram, unsigned** marker) { 
    static const int bin_size = 256; 
    int left = histogram[0]; 
    int index; 
    int tmp; 
    while (left-- > 0) { 
     index = *marker[0] >> 24; 
     while (index != 0) { 
      tmp = *marker[index]; 
      *marker[index]++ = *marker[0]; 
      *marker[0] = tmp; 
      index = *marker[0] >> 24; 
     } 
     ++marker[0]; 
    } 
    for (int k = 1; k < bin_size; ++k) { 
     left = histogram[k] - (marker[k] - marker[k - 1]); 
     while (left-- > 0) { 
      index = *marker[k] >> 24; 
      while (index != k) { 
       tmp = *marker[index]; 
       *marker[index]++ = *marker[k]; 
       *marker[k] = tmp; 
       index = *marker[k] >> 24; 
      } 
      ++marker[k]; 
     } 
    } 
} 
void parallel_sort(unsigned* list, int size) { 
    //Build histogram 
    static const int bin_size = 256; 
    int histogram[bin_size] = { 0 }; 
    int histogram1[bin_size] = { 0 }; 
    int histogram2[bin_size] = { 0 }; 
    int histogram3[bin_size] = { 0 }; 
    const int partial_size = size/4; 
    count(list, partial_size, histogram); 
    count(&list[partial_size], partial_size, histogram1); 
    count(&list[2 * partial_size], partial_size, histogram2); 
    count(&list[3 * partial_size], partial_size + (size % 4), histogram3); 
    unsigned int* marker[bin_size]; 
    unsigned int* marker1[bin_size]; 
    unsigned int* marker2[bin_size]; 
    unsigned int* marker3[bin_size]; 
    unsigned int* previous = list; 
    for (int i = 0; i < bin_size; ++i) { 
     marker[i] = previous; 
     marker1[i] = marker[i] + histogram[i]; 
     marker2[i] = marker1[i] + histogram1[i]; 
     marker3[i] = marker2[i] + histogram2[i]; 
     previous = marker3[i] + histogram3[i]; 
    } 
    //Breaks in srt in any of those threads 
    thread t21(srt, histogram1, marker1); 
    thread t22(srt, histogram2, marker2); 
    thread t23(srt, histogram3, marker3); 
    srt(histogram, marker); 
    t21.join(); 
    t22.join(); 
    t23.join(); 
//TODO 
} 
int main() { 
    const int size = 100000; 
    unsigned list[size]; 
    srand(time(NULL)); 
    for (int i = 0;i < size;++i) { 
     list[i] = rand()*rand(); 
    } 
    parallel_sort(list, size); 
} 

我使用的i3戴爾的Windows 10時,Visual Studio 2015年下列選項

/MP /GS /analyze- /W3 /Zc:wchar_t /ZI /Gm /Od /sdl /Fd"Debug\vc140.pdb" /Zc:inline /fp:precise /D "_MBCS" /errorReport:prompt /WX- /Zc:forScope /RTC1 /Gd /Oy- /MDd /Fa"Debug\" /EHsc /nologo /Fo"Debug\" /Fp" 
+0

如果你可以調試你的代碼(帶斷點),它會更容易找到它崩潰的地方 – 2016-03-07 13:51:00

+0

在函數srt中,在「tmp = * marker [index]」其中索引是63. –

+0

嗯,你正在初始化你的'標記'使用'列表'。那麼你在哪裏聲明'list'?我認爲這是因爲你正在做'marker2 [i] = marker1 [i] + histogram1 [i];'和類似的線條。想象一下,如果'histogram1 [i]'有一個非常大的值,使'marker2 [i]'指向'list'之外的東西? – 2016-03-07 14:02:23

回答

0

的問題是在這裏:

int main() { 
    const int size = 100000; 
    unsigned list[size]; 
    srand(time(NULL)); 
    for (int i = 0;i < size;++i) { 
     list[i] = rand()*rand(); 
    } 
    parallel_sort(list, size); 
} 

所以如果list[i] = rand()*rand();導致許多是否大於size這是100,000?你的數組的指針絕對會超出list的範圍,這就是你的程序崩潰的原因。總之,由於您使用的是基數排序,所以您應該確保所有要排序的值都在數組最大值以下。

+0

請注意,這只是第一次基於前導8位構建直方圖的基數排序。 (它使用>> 24放下以下位。)這就是爲什麼每個直方圖的大小爲256. –

0

嘗試並行基數波紋代碼:

#include <stdio.h> 
#include <errno.h> 
#include <string.h> 
#include <assert.h> 
#include <stdlib.h> 
#include <pthread.h> 
#include <unistd.h> 
#include <sys/time.h> 

enum errors { 
    INVALID_USAGE, 
    ERROR_OPENING_INPUT, 
    ERROR_OPENING_OUTPUT, 
    INVALID_FORMAT, 
    MEMORY_ERROR, 
    THREAD_CREATE_ERROR, 
    THREAD_JOIN_ERROR 
}; 

const unsigned char KEY_LENGTH = 7; 
const unsigned char CHARKEY_LENGTH = KEY_LENGTH + 1; 
const unsigned short INPUT_SPACE = 256; 

struct thread_info 
{ 
    unsigned char **input; 
    unsigned char **output; 
    unsigned short thread_idx; 
    unsigned int first_idx; 
    unsigned int last_idx; 
    pthread_barrier_t *barrier; 
    unsigned int *local_counters; // array of array of integer 
    unsigned int thread_count; 
}; 

inline unsigned int coordinate(const unsigned int line) 
{ 
    return (line * CHARKEY_LENGTH); 
} 

// map each string of the memory array input to a pointer in output 
inline void map_strings(unsigned char *const input, unsigned char 
**output, const unsigned int size) 
{ 
    for(unsigned int i = 0; i < size; ++i){ 
    output[i] = &(input[coordinate(i)]); 
    } 
} 

inline unsigned short core_count() 
{ 
    // Linux, Solaris, AIX, etc: 
    return (unsigned short)sysconf(_SC_NPROCESSORS_ONLN); 
} 

void print_usage(const char *const prog_name) 
{ 
    fprintf(stderr, "Usage: " 
        "%s input_file ouput_file\n", prog_name); 
} 

/* 
    counter the occurences of each char in input at radix i 
    first_idx and last_idx define the range of the input where the count 
is needed 
    the result is stored in counter 
*/ 
inline void count_char(const unsigned char *const *const input, 
         const unsigned int first_idx, 
         const unsigned int last_idx, 
         const unsigned int radix, 
         unsigned int *counter) 
{ 
    memset(counter, 0, INPUT_SPACE*sizeof(unsigned int)); 
    for(unsigned int i = first_idx; i < last_idx; ++i){ 
    const unsigned char c = input[i][radix]; 
    ++(counter[c]); 
    } 
} 

/* compute the offset of the current thread 
    local_counters: array of all coutners 
    thread_idx: current thread idx 
    thread_count: number of thread 
    offset: the array which is populated with the offset 
*/ 
inline void compute_offset(const unsigned int *const local_counters, 
          const unsigned int thread_idx, 
          const unsigned int thread_count, 
          unsigned int *const offset){ 
    unsigned int local_offset[INPUT_SPACE]; 
    unsigned int global_counter[INPUT_SPACE]; 
    for(unsigned int i = 0; i < INPUT_SPACE; ++i){ 
    global_counter[i] = 0; 
    for(unsigned int thread_i = 0; thread_i < thread_count; ++thread_i) 
{ 
     if (thread_idx == thread_i){ 
     local_offset[i] = global_counter[i]; 
     } 
     const unsigned int value = local_counters[thread_i*INPUT_SPACE + 
i]; 
     global_counter[i] += value; 
    } 
    } 
    unsigned int previous_offset = 0; 
    for(unsigned int i = 1; i < INPUT_SPACE; ++i){ 
    previous_offset += global_counter[i-1]; 
    offset[i] = previous_offset + local_offset[i]; 
    } 
} 

inline void sort_input(unsigned char *const *const input, 
         unsigned int *const offset, 
         const unsigned int first_idx, 
         const unsigned int last_idx, 
         const unsigned int radix, 
         unsigned char **const output){ 
    for(unsigned int i = first_idx; i < last_idx; ++i){ 
    const unsigned char c = input[i][radix]; 
    const unsigned int current_offset = offset[c]++; 
    output[current_offset] = input[i]; 
    } 
} 

void *concurrent_radix(void *arg) 
{ 
    thread_info *const info = (thread_info*)arg; 
    const unsigned short thread_idx = info->thread_idx; 
    const unsigned int first_idx = info->first_idx; 
    const unsigned int last_idx = info->last_idx; 
    const unsigned int thread_count = info->thread_count; 
    unsigned int *const local_counters = info->local_counters; 
    unsigned char **input = info->input; 
    unsigned char **output = info->output; 
    pthread_barrier_t *const barrier = info->barrier; 
    unsigned int *const counter = &(local_counters[thread_idx * 
INPUT_SPACE]); 
    int radix = KEY_LENGTH - 1; 
    do { 
    count_char(input, first_idx, last_idx, radix, counter); 

    pthread_barrier_wait(barrier); 

    unsigned int offset[INPUT_SPACE]; 
    compute_offset(local_counters, thread_idx, thread_count, offset); 
    sort_input(input, offset, first_idx, last_idx, radix, output); 

    pthread_barrier_wait(barrier); 

    unsigned char **const temp = input; 
    input = output; 
    output = temp; 
    --radix; 
    } while(radix >= 0); 
    return NULL; 
} 

// sort the radix index 
inline void threaded_radix (unsigned char **input, unsigned char 
**output, const unsigned int nb_keys) 
{ 
    const unsigned short nb_core = core_count(); 

    pthread_t threads[nb_core]; 
    thread_info threads_arg[nb_core]; 
    unsigned int local_counters[nb_core * INPUT_SPACE]; 

    pthread_barrier_t barrier; 
    pthread_barrier_init(&barrier, NULL, nb_core); 

    const unsigned int range = nb_keys/nb_core; 
    unsigned int last_idx = 0; 
    for (unsigned short i = 0; i < nb_core; ++i){ 
    const unsigned int first_idx = last_idx; 
    last_idx = last_idx + range; 

thread_info &info = threads_arg[i]; 
info.input = input; 
info.output = output; 
info.first_idx = first_idx; 
info.last_idx = last_idx; 
info.thread_idx = i; 
info.thread_count = nb_core; 
info.local_counters = local_counters; 
info.barrier = &barrier; 
    } 
    threads_arg[nb_core-1].last_idx = nb_keys; 

    for (unsigned short i = 1; i < nb_core; ++i){ 
    pthread_create(&threads[i], NULL, concurrent_radix, (void *)& 
(threads_arg[i])); 
    } 
    concurrent_radix((void *)&(threads_arg[0])); 

    for (unsigned short i = 1; i < nb_core; ++i){ 
    pthread_join(threads[i], NULL); 
    } 
    pthread_barrier_destroy(&barrier); 
} 

inline void radix_sort(unsigned char *input, unsigned char **output, 
const unsigned int nb_keys) 
{ 
    unsigned char **buffer = (unsigned char **)malloc(nb_keys * 
sizeof(unsigned char*)); 
    map_strings(input, buffer, nb_keys); 
    threaded_radix(buffer, output, nb_keys); 

    free(buffer); 
} 

int main(const int argc, const char *const argv[]) 
{ 
    if(argc < 3) { 
    print_usage(argv[0]); 
    return INVALID_USAGE; 
    } 
    // import the data in a table 
    FILE *input = fopen(argv[1], "r"); 
    if(!input){ 
    const int errsv = errno; 
fprintf(stderr, "%s: %s\n", argv[1], strerror(errsv)); 
    print_usage(argv[0]); 
    return ERROR_OPENING_INPUT; 
    } 

    unsigned int input_size; 
    { 
    int converted = fscanf(input, "%d\n", &input_size); 
    if(converted != 1){ 
     fprintf(stderr, "Invalid file format"); 
     return INVALID_FORMAT; 
    } 
    } 

    unsigned char *input_table = (unsigned char *)malloc(input_size * 
CHARKEY_LENGTH * sizeof(unsigned char)); 
    unsigned char **output_table = (unsigned char **)malloc(input_size * 
sizeof(unsigned char*)); 
    if(!input_table || !output_table){ 
    fprintf(stderr, "Error: not enough memory\n"); 
    return MEMORY_ERROR; 
    } 

    for(unsigned int i = 0; i<input_size; ++i){ 
unsigned char * key = &(input_table[coordinate(i)]); 
size_t size_read = fread(key, 1, CHARKEY_LENGTH, input); 
key[KEY_LENGTH] = '\0'; 
    if(size_read != CHARKEY_LENGTH){ 
     if(feof(input)){ 
     assert(i == (input_size - 1)); 
     } else { 
     fprintf(stderr, "Invalid file format"); 
     return INVALID_FORMAT; 
     } 
    } 
    } 
    fclose(input); 

    // sort 
    struct timeval tick1, tick2; 
    gettimeofday(&tick1, NULL); 

    radix_sort(input_table, output_table, input_size); 

    gettimeofday(&tick2, NULL); 
    const double ellapsed = (tick2.tv_sec + tick2.tv_usec/1000000.0) - 
(tick1.tv_sec + tick1.tv_usec/1000000.0); 
    printf("time for action = %g seconds\n", ellapsed); 

    // write the output 
    FILE *output = fopen(argv[2], "w+"); 
    if(!input){ 
const int errsv = errno; 
    fprintf(stderr, "%s: %s\n", argv[2], strerror(errsv)); 
    print_usage(argv[0]); 
    return ERROR_OPENING_OUTPUT; 
    } 

    fprintf(output, "%d\n", input_size); 
    for(unsigned int i = 0; i < input_size;++i){ 
    fprintf(output, "%s\n", output_table[i]); 
    } 
    fclose(output); 

    free(input_table); 
    free(output_table); 

    return 0; 
} 

不要忘記添加選項-pthread編譯它。

+0

'> -pthread''> Visual Studio 2015' ...我應該告訴爲什麼這對OP來說完全沒用嗎? –