2017-06-07 120 views
2

我有多個讀取線程和一個寫入線程。如果我在其中一個讀線程上鎖定了互斥體並從中發送了廣播,是否可以保證互斥體將被等待在pthread_cond_wait()上的寫入線程鎖定,或者是否存在另一個讀取線程在pthread_mutex_lock()上隱藏的可能性互斥?主要問題是pthread_cond_wait()的優先級高於pthread_mutex_lock()嗎?pthread_cond_wait和pthread_mutex_lock優先級?

如果不是,我該如何實現互斥鎖將始終被pthread_cond_broadcast()上的寫入線程鎖定?

讀主題:

pthread_mutex_lock(mutex); 
pthread_cond_broadcast(cond); 
pthread_mutex_unlock(mutex); 

編寫線程:

pthread_mutex_lock(&mutex); 
pthread_cond_wait(&cond, &mutex); 

回答

0

假設兩個線程,讀取和寫入,在同一時刻到達pthread_mutex_lock。所以,要麼寫線程調用pthread_mutex_lock獲得互斥量,要麼讀取線程。

如果它是寫入線程,讀取的將等待pthread_mutex_lock。寫,通過呼籲pthread_cond_wait版本mutex和塊上cond。它是以原子方式完成的。所以,當讀取線程爲mutex時,我們可以確定讀取的線程在cond上等待。因此,廣播cond到達寫線程,它不再等待cond,但 - 仍在pthread_cond_wait範圍 - 試圖獲得鎖定mutex(保持爲讀線程)。讀取線程在廣播cond後釋放mutex並轉到寫入線程。所以寫線程終於從pthread_cond_wait退出,鎖定了mutex。請記住稍後解鎖它。

如果它是讀取線程,寫入將等待pthread_mutex_lock,讀取將在cond上廣播信號,然後釋放mutex。之後,寫線程獲取pthread_mutex_lock上的mutex並立即釋放它pthread_cond_wait等待cond(請注意,之前的cond廣播對當前的pthread_cond_wait沒有影響)。在讀取線程的下一次迭代中,它獲取鎖定mutex,發送廣播cond並解鎖mutex。這意味着寫入線程在cond上向前移動,並獲取mutex上的鎖定。

它是否回答您關於優先級的問題?


更新後評論。

我們假設我們有一個線程(讓我們將其命名爲A以供將來參考)持有鎖mutex和其他幾個嘗試獲取相同的鎖。只要第一個線程釋放了鎖,就無法預測哪個線程會獲得鎖。此外,如果A線程有一個循環並嘗試重新獲取鎖定mutex,則有可能會被授予此鎖定,並且其他線程會一直等待。添加pthread_cond_wait不會更改授予鎖定範圍內的任何內容。

讓我引用POSIX規範的片段(見https://stackoverflow.com/a/9625267/2989411供參考):

這些功能原子方式釋放互斥和導致調用線程阻塞條件變量COND;原子地這裏的意思是「原子上關於另一個線程訪問互斥體,然後是條件變量」。也就是說,如果另一個線程能夠在約程序線程釋放它之後獲取該互斥體,則該線程中對pthread_cond_broadcast()或pthread_cond_signal()的後續調用將表現得好像它是在about-阻塞線程被阻塞。

這只是標準中關於操作順序的保證。爲其他線程授予鎖定的順序是相當難以預測的,並且根據一些非常微妙的時間波動而變化。

僅用於互斥體相關的代碼,請用一下下面的代碼:

#define _GNU_SOURCE 
#include <pthread.h> 

#include <stdio.h> 
#include <unistd.h> 

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 

void *th(void *arg) { 
    int i; 
    char *s = arg; 
    for (i = 0; i < 10; ++i) { 
     pthread_mutex_lock(&mutex); 
     printf("%s %d\n", s, i); 
     //sleep(1); 
     pthread_mutex_unlock(&mutex); 
#if 0 
     pthread_yield(); 
#endif 
    } 
    return NULL; 
} 

int main() { 
    int i; 
    for (i = 0; i < 10; ++i) { 
     pthread_t t1, t2, t3; 
     printf("================================\n"); 
     pthread_create(&t1, NULL, th, "t1"); 
     pthread_create(&t2, NULL, th, "  t2"); 
     pthread_create(&t3, NULL, th, "   t3"); 
     pthread_join(t1, NULL); 
     pthread_join(t2, NULL); 
     pthread_join(t3, NULL); 
    } 
    return 0; 
} 

在一體機(單CPU),它總是顯示從T3,T2則整個循環最後從T1。在另一個(2個內核)線程的順序更隨機,但幾乎總是在爲其他線程授予互斥之前,爲每個線程顯示整個循環。很少有一種情況,如:

t1 8 
t1 9 
      t3 0 
    t2 0 
    t2 1 
    [removed other t2 output] 
    t2 8 
    t2 9 
      t3 1 
      t3 2 

#if 1更換#if 0啓用pthread_yield,看結果,是否有輸出。對我而言,它的工作方式是兩個線程交錯顯示輸出,然後第三個線程終於有機會工作。添加另一個或多個線程。用睡覺玩等,它證實了隨機行爲。

如果您希望稍微做一點實驗,請編譯並運行以下代碼片段。這是一個單一生產者的例子 - 多個消費者模型。它可以用兩個參數運行:第一個是消費者線程的數量,第二個是生成的數據序列的長度。如果沒有給出參數,則有一個消費者線程和120個要處理的項目。我還建議睡眠/ usleep的地方標明/* play here */:改變參數的值,在全部除去睡眠,移動它 - 在適當的時候 - 到臨界區或pthread_yield更換和觀察行爲的變化。

#define _GNU_SOURCE 
#include <assert.h> 
#include <limits.h> 
#include <pthread.h> 
#include <stdio.h> 
#include <stdlib.h> 
#include <time.h> 
#include <unistd.h> 

struct data_t { 
    int seq; 
    int payload; 
    struct data_t *next; 
}; 

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; 
struct data_t *first = NULL, *last = NULL; 
int in_progress = 1; 
int num_data = 120; 

void push(int seq, int payload) { 
    struct data_t *e; 
    e = malloc(sizeof(struct data_t)); 
    e->seq = seq; 
    e->payload = payload; 
    e->next = NULL; 
    if (last == NULL) { 
     assert(first == NULL); 
     first = last = e; 
    } else { 
     last->next = e; 
     last = e; 
    } 
} 

struct data_t pop() { 
    struct data_t res = {0}; 
    if (first == NULL) { 
     res.seq = -1; 
    } else { 
     res.seq = first->seq; 
     res.payload = first->payload; 
     first = first->next; 
     if (first == NULL) { 
      last = NULL; 
     } 
    } 
    return res; 
} 

void *producer(void *arg __attribute__((unused))) { 
    int i; 
    printf("producer created\n"); 
    for (i = 0; i < num_data; ++i) { 
     int val; 
     sleep(1); /* play here */ 
     pthread_mutex_lock(&mutex); 
     val = rand()/(INT_MAX/1000); 
     push(i, val); 
     pthread_mutex_unlock(&mutex); 
     pthread_cond_signal(&cond); 
     printf("prod %3d %3d signaled\n", i, val); 
    } 
    in_progress = 0; 
    printf("prod end\n"); 
    pthread_cond_broadcast(&cond); 
    printf("prod end signaled\n"); 
    return NULL; 
} 

void *consumer(void *arg) { 
    char c_id[1024]; 
    int t_id = *(int *)arg; 
    sprintf(c_id, "%*s c %02d", t_id % 10, "", t_id); 
    printf("%s created\n", c_id); 
    while (1) { 
     struct data_t item; 
     pthread_mutex_lock(&mutex); 
     item = pop(); 
     while (item.seq == -1 && in_progress) { 
      printf("%s waits for data\n", c_id); 
      pthread_cond_wait(&cond, &mutex); 
      printf("%s got signal\n", c_id); 
      item = pop(); 
     } 
     if (!in_progress && item.seq == -1) { 
      printf("%s detected end of data.\n", c_id); 
      pthread_mutex_unlock(&mutex); 
      break; 
     } 
     pthread_mutex_unlock(&mutex); 
     printf("%s processing %3d %3d\n", c_id, item.seq, item.payload); 
     sleep(item.payload % 10); /* play here */ 
     printf("%s processed %3d %3d\n", c_id, item.seq, item.payload); 
    } 
    printf("%s end\n", c_id); 
    return NULL; 
} 

int main(int argc, char *argv[]) { 
    int num_cons = 1; 
    pthread_t t_prod; 
    pthread_t *t_cons; 
    int i; 
    int *nums; 
    if (argc > 1) { 
     num_cons = atoi(argv[1]); 
     if (num_cons == 0) { 
      num_cons = 1; 
     } 
     if (num_cons > 99) { 
      num_cons = 99; 
     } 
    } 
    if (argc > 2) { 
     num_data = atoi(argv[2]); 
     if (num_data < 10) { 
      num_data = 10; 
     } 
     if (num_data > 600) { 
      num_data = 600; 
     } 
    } 

    printf("Spawning %d consumer%s for %d items.\n", num_cons, num_cons == 1 ? "" : "s", num_data); 
    t_cons = malloc(sizeof(pthread_t) * num_cons); 
    nums = malloc(sizeof(int) * num_cons); 
    if (!t_cons || !nums) { 
     printf("Out of memory!\n"); 
     exit(1); 
    } 
    srand(time(NULL)); 
    pthread_create(&t_prod, NULL, producer, NULL); 

    for (i = 0; i < num_cons; ++i) { 
     nums[i] = i + 1; 
     usleep(100000); /* play here */ 
     pthread_create(t_cons + i, NULL, consumer, nums + i); 
    } 

    pthread_join(t_prod, NULL); 

    for (i = 0; i < num_cons; ++i) { 
     pthread_join(t_cons[i], NULL); 
    } 
    free(nums); 
    free(t_cons); 

    return 0; 
} 

我希望我已經清除你的疑慮,並給你一些代碼進行試驗,並獲得有關並行線程行爲的一些信心。

+0

感謝您的回答。無論如何,我不知道它絕對回答了我的問題。如果一個寫線程正在等待pthread_cond_wait()並且我們有兩個讀線程,會發生什麼情況。一個讀線程設法鎖定互斥併發送廣播。釋放此互斥鎖後,是否有任何保證寫入線程會在等待廣播時鎖定互斥鎖(另一個讀取線程正在等待同時鎖定互斥鎖)? – testtest1235

+0

@ testtest1235我已經用一些附加信息和示例擴展了我的答案。 – ArturFH