2016-02-07 66 views
0

我想實現一個代碼練習同步之間的正確傳遞的值,因此可能不是最好的設計或方法,但目標是如下無法重新獲得互斥體和線程

  • 主線程

    1. 創建100個整數和等待的任何線程的有效載荷可用

    2. 當它從一個線程其可用的信號 - 它解鎖有效載荷複製和前進至創建另一個有效載荷

  • 工作線程

    1. 上創造它使本身可用於數據處理,併發送信號,其可用

    2. 試圖鎖定數據來自主線程的有效載荷並將其複製到本地陣列 (此處觀察錯誤 - 無法正確訪問數據)

    3. 關閉可用 符號通過本地副本(無法關閉可用狀態爲關閉)

    4. 繼續處理數據


#include <pthread.h> 
#include <stdio.h> 
#include <stdlib.h> 
#include <unistd.h> 
#include <stdbool.h> 

#define WORKERS 2 
#define ARRAY_ELEMENTS 100 
#define MAX 1000 


pthread_mutex_t mutex_bucket1 = PTHREAD_MUTEX_INITIALIZER; 
pthread_mutex_t mutex_signal = PTHREAD_MUTEX_INITIALIZER; 
pthread_cond_t cond_go = PTHREAD_COND_INITIALIZER; 
pthread_cond_t cond_busy = PTHREAD_COND_INITIALIZER; 

static int value = 0; 
bool available = false; 

void *worker_thread(void *pbucket) 
{ 
    sleep(5); 
    while(1) 
    { 
     unsigned int count = 0; 
     int local_array[ARRAY_ELEMENTS]; 
     int *ptbucket = (int*)pbucket; 
     setbuf(stdout, NULL); 

     pthread_mutex_lock(&mutex_signal); 
     printf(" -------------- \n chainging state to available \n --------- "); 
     available = true; 
     printf(" -------------- \n from thread sending go signal \n --------- "); 
     pthread_cond_signal(&cond_go); 
     pthread_mutex_unlock(&mutex_signal); 


     pthread_mutex_lock(&mutex_bucket1); 
     printf(" -------------- \n data part locked in thread for copying \n --------- "); 
     while(count < ARRAY_ELEMENTS) 
     { 
      printf(" %d - \n", ptbucket[count]); /***incorrect values***/ 
      local_array[count] = ptbucket[count]; 
      count++; 
     } 
     pthread_mutex_unlock(&mutex_bucket1); 

     /*Never able to acquire mutex_signal and change state to not available*/ **BUG** 
     pthread_mutex_lock(&mutex_signal); 
     printf(" -------------- \n chainging state to not available \n --------- "); 
     available = false; 
     pthread_mutex_unlock(&mutex_signal); 

     count = 0; 

     while(count < ARRAY_ELEMENTS) 
     { 
      printf(" %d - \n", local_array[count]); 
      count++; 
     } 

     printf(" -------------- \n about to sleep for 5secs \n --------- "); 
     sleep(5); 
    } 
} 

int main(void) 
{ 
    pthread_t thread_id[WORKERS]; 

    unsigned int* pbucket1 = (int*) malloc(sizeof(int) * ARRAY_ELEMENTS); 

    unsigned int* pbucket; 

    for(int i = 0; i < WORKERS - 1; i++) 
    { 
     pthread_create(&thread_id[i], NULL, worker_thread, (void *) pbucket); 
    } 

    for(int i = 0; i < MAX; i++) 
    { 
     unsigned int count = 0; 

     pbucket = pbucket1; 

     // Make the payload ready 
     pthread_mutex_lock(&mutex_bucket1); 

     printf(" -------------- creating data payload --------- \n"); 

     while(count < ARRAY_ELEMENTS) 
     { 
      pbucket1[count] = i; 
      i++; 
      count++; 
     } 

     printf(" -------------- \n waiting for go signal \n --------- "); 

     while(!available) 
     { 
      pthread_cond_wait(&cond_go, &mutex_signal); 
     } 

     pthread_mutex_unlock(&mutex_bucket1); 

     /*I believe after we unlock variable "available" can be mutexed 
      again by other thread but seems thinking is flawed */ 

     printf(" -------------- \n Main thread sleep for 3 seconds \n --------- "); 
     sleep(3); 
    } 

    for(int i = 0; i < WORKERS; i++) 
    { 
     pthread_join(thread_id[i], NULL); 
    } 

    return 0; 
} 
+0

'printf'不針對多個線程進行同步。你有多個線程在彼此之上使用它。 – e0k

+0

@eok - 我試圖誘導睡眠人爲地購買時間..也爲目前的代碼,我保持線程數只有1,仍然無法取得任何有意義的結果 – oneday

+0

我也評論了所有,但1個printf - 打印數字,仍然獲得相同的結果 – oneday

回答

2

我覺得有些你的想法是倒退的;它不應該是等待的主要上下文,它應該是工作線程正在等待數據...

主線程的工作應該是保持填充有效負載和每次喚醒一個線程來處理它。

所以這裏的一些潦草的代碼,多一點理智,我認爲:

/** 
    file: answer.c 
    compile: gcc -o answer answer.c -pthread 
    usage: answer [numThreads] [numElements] 
**/ 
#include <stdio.h> 
#include <stdlib.h> 
#include <string.h> 
#include <pthread.h> 

#define STATE_WAIT 1 
#define STATE_READY 2 

void *routine(void*); 

typedef struct _shared_t { 
    pthread_mutex_t  m; 
    pthread_cond_t  c; 
    unsigned char  state; 
    int     *payload; 
    size_t    numElements; 
    pthread_t   *threads; 
    size_t    numThreads; 
} shared_t; 

static inline void shared_init(shared_t *shared, size_t numThreads, size_t numElements) { 
    memset(shared, 0, sizeof(shared_t)); 

    pthread_mutex_init(&shared->m, NULL); 
    pthread_cond_init(&shared->c, NULL); 

    shared->state = STATE_WAIT; 

    shared->numThreads = numThreads; 
    shared->numElements = numElements; 

    { 
     int it = 0; 

     shared->threads = (pthread_t*) calloc(shared->numThreads, sizeof(pthread_t)); 

     while (it < shared->numThreads) { 
      if (pthread_create(&shared->threads[it], NULL, routine, shared) != 0) { 
       break; 
      } 
      it++; 
     } 
    } 
} 

static inline void shared_populate(shared_t *shared) { 
    if (pthread_mutex_lock(&shared->m) != 0) { 
     return; 
    } 

    shared->payload = (int*) calloc(shared->numElements, sizeof(int)); 

    { 
     int it = 0, 
      end = shared->numElements; 

     while (it < end) { 
      shared->payload[it] = rand(); 

      it++; 
     } 
    } 

    shared->state = STATE_READY; 

    pthread_cond_signal(&shared->c); 

    pthread_mutex_unlock(&shared->m); 
} 

static inline void shared_cleanup(shared_t *shared) { 
    int it = 0, 
     end = shared->numThreads; 

    while (it < end) { 
     pthread_join(shared->threads[it], NULL); 
    } 

    pthread_mutex_destroy(&shared->m); 
    pthread_cond_destroy(&shared->c); 

    free(shared->threads); 
} 

void* routine(void *arg) { 
    shared_t *shared = (shared_t*) arg; 
    int *payload; 

    do { 
     if (pthread_mutex_lock(&shared->m) != 0) { 
      break; 
     } 

     while (shared->state == STATE_WAIT) { 
      pthread_cond_wait(&shared->c, &shared->m); 
     } 

     payload = shared->payload; 

     shared->state = STATE_WAIT; 

     pthread_mutex_unlock(&shared->m); 

     if (payload) { 
      int it = 0, 
       end = shared->numElements; 

      while (it < end) { 
       printf("Thread #%ld got payload %p(%d)=%d\n", 
        pthread_self(), payload, it, payload[it]); 
       it++; 
      } 

      free(payload); 
     } 
    } while(1); 

    pthread_exit(NULL); 
} 

int main(int argc, char *argv[]) { 
    shared_t shared; 

    int numThreads = argc > 1 ? atoi(argv[1]) : 1; 
    int numElements = argc > 2 ? atoi(argv[2]) : 100; 

    shared_init(&shared, numThreads, numElements); 

    do { 
     shared_populate(&shared); 
    } while (1); 

    shared_cleanup(&shared); 

    return 0; 
} 

顯然,上面的代碼是不是很寬容的錯誤,並且不容易關機乾淨......這是插圖只要。

讓我們先來看看main讓我們知道什麼是主要程序的流程將是:

int main(int argc, char *argv[]) { 
    shared_t shared; 

    int numThreads = argc > 1 ? atoi(argv[1]) : 1; 
    int numElements = argc > 2 ? atoi(argv[2]) : 100; 

    shared_init(&shared, numThreads, numElements); 

    do { 
     shared_populate(&shared); 
    } while (1); 

    shared_cleanup(&shared); 

    return 0; 
} 

它使堆棧上的shared_t

typedef struct _shared_t { 
    pthread_mutex_t  m; 
    pthread_cond_t  c; 
    unsigned char  state; 
    int     *payload; 
    size_t    numElements; 
    pthread_t   *threads; 
    size_t    numThreads; 
} shared_t; 

晴言自明,互斥,條件和狀態是同步所必需的。

首先把shared_t必須互斥,條件,狀態,並使用所提供的選項的線程進行初始化:

static inline void shared_init(shared_t *shared, size_t numThreads, size_t numElements) { 
    memset(shared, 0, sizeof(shared_t)); 

    pthread_mutex_init(&shared->m, NULL); 
    pthread_cond_init(&shared->c, NULL); 

    shared->state = STATE_WAIT; 

    shared->numThreads = numThreads; 
    shared->numElements = numElements; 

    { 
     int it = 0; 

     shared->threads = (pthread_t*) calloc(shared->numThreads, sizeof(pthread_t)); 

     while (it < shared->numThreads) { 
      if (pthread_create(&shared->threads[it], NULL, routine, shared) != 0) { 
       break; 
      } 
      it++; 
     } 
    } 
} 

當通過該程序創建的工作線程,他們被迫進入等待狀態。

在環路shared_populate第一個電話喚醒了第一線的有效載荷設置一些隨機數後:

static inline void shared_populate(shared_t *shared) { 
    if (pthread_mutex_lock(&shared->m) != 0) { 
     return; 
    } 

    shared->payload = (int*) calloc(shared->numElements, sizeof(int)); 

    { 
     int it = 0, 
      end = shared->numElements; 

     while (it < end) { 
      shared->payload[it] = rand(); 

      it++; 
     } 
    } 

    shared->state = STATE_READY; 

    pthread_cond_signal(&shared->c); 

    pthread_mutex_unlock(&shared->m); 
} 

注意使用pthread_cond_signal超過pthread_cond_broadcast,因爲我們只是想喚醒的第一個線程。

void* routine(void *arg) { 
    shared_t *shared = (shared_t*) arg; 
    int *payload; 

    do { 
     if (pthread_mutex_lock(&shared->m) != 0) { 
      break; 
     } 

     while (shared->state == STATE_WAIT) { 
      pthread_cond_wait(&shared->c, &shared->m); 
     } 

     payload = shared->payload; 

     shared->state = STATE_WAIT; 

     pthread_mutex_unlock(&shared->m); 

     if (payload) { 
      int it = 0, 
       end = shared->numElements; 

      while (it < end) { 
       printf("Thread #%ld got payload %p(%d)=%d\n", 
        pthread_self(), payload, it, payload[it]); 
       it++; 
      } 

      free(payload); 
     } 
    } while(1); 

    pthread_exit(NULL); 
} 

因此,我們在呼籲pthread_cond_waitroutine醒來,狀態已經改變,所以我們打出來的循環中,我們將指向有效載荷,復位狀態等,並釋放互斥。

此時main可以重新填充有效負載並喚醒下一個線程,同時當前工作線程可以處理,然後釋放有效負載。

幾點建議:

  • 始終使用盡可能少的互斥和條件變量的可能(KISS)
  • 研究條件變量的原子性
  • 始終遵循關於收購和互斥的釋放基本規則併發送條件變量的信號:
    • 如果您鎖定它,請將其解鎖。
    • 只有等待對於東西:始終需要謂詞等待循環。

如果您不能重現我做,然後採取代碼,並嘗試在其擴展;你需要做的第一件事是能夠優雅地關閉進程(輸入shared_cleanup),也許你需要一個可變大小的有效載荷,或者原始問題中沒有提到的其他需求。

關於printf ...的注意事項附加到流不保證是原子性的,它發生在* nix上的大部分時間...因爲我們只是在做節目並告訴,不需要關心...通常,不要依賴原子性來進行任何流操作...

+0

非常感謝您的參考代碼和解釋。它的設計更清晰,更有意義。你能不能請我指出爲什麼在我的代碼中,我無法重新獲取互斥鎖?這也有助於我更清楚地瞭解鎖定和解鎖機制 – oneday