2016-07-03 40 views
2

我試圖與多生產者和消費者做一個代碼。我爲生產者和消費者創建了多線程,並使用信號量進行同步。代碼與單個生產者和消費者一起工作正常。多生產者 - 消費者執行的效率

我面對的問題是,在程序執行一段時間後,只有consumer1和producer1參與了這個過程。我無法理解其他生產者和消費者發生了什麼事。

我還想知道如何讓多生產者 - 消費者問題高效?所有的生產者和消費者分別得到平等的生產和消費機會的意義上的有效性? C++代碼(它包括了很多的C):

#include <iostream> 
#include <pthread.h> 
#include <semaphore.h> 
#include <unistd.h> 
#include <queue> 
using namespace std; 
sem_t empty; 
sem_t full; 
int cnt = 0; 
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
queue<int> q; 
void *producer(void *a) 
{ 
    int *num = (int *)a; 
    while(1) { 
     sem_wait(&empty); 
     pthread_mutex_lock(&mutex); 
     cnt = cnt+1; 
     q.push(cnt); 
     cout<<cnt<<" item produced by producer "<<(*num+1)<<endl; 
     pthread_mutex_unlock(&mutex); 
     sem_post(&full); 
     sleep(1); 
    } 
} 
void *consumer(void *a) 
{ 
    int *num = (int *)a; 
    while(1) { 
     sem_wait(&full); 
     pthread_mutex_lock(&mutex); 
     cout<<q.front()<<" item consumed by consumer "<<(*num+1)<<endl; 
     q.pop(); 
     pthread_mutex_unlock(&mutex); 
     sem_post(&empty); 
     sleep(1); 
    } 
} 
int main() 
{ 
    pthread_t p[5]; 
    pthread_t c[5]; 
    sem_init(&empty,0,5); 
    sem_init(&full,0,0); 
    int i; 
    for(i = 0; i < 5; i++) { 
     pthread_create(&p[i],NULL,producer,(void *)(&i)); 
    } 
    for(i = 0; i < 5; i++) { 
     pthread_create(&c[i],NULL,consumer,(void *)(&i)); 
    } 
    for(i = 0; i < 5; i++) { 
     pthread_join(p[i],NULL); 
     pthread_join(c[i],NULL); 
    } 
} 

更新代碼:

#include <iostream> 
#include <pthread.h> 
#include <semaphore.h> 
#include <unistd.h> 
#include <queue> 
#include <map> 
using namespace std; 
sem_t empty; 
sem_t full; 
int cnt = 0; 
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
map<pthread_t,int> mc,mp; 
queue<int> q; 
void *producer(void *a) 
{ 
    while(1) { 
     sem_wait(&empty); 
     pthread_mutex_lock(&mutex); 
     cnt = cnt+1; 
     q.push(cnt); 
     cout<<cnt<<" item produced by producer "<<mp[pthread_self()]<<endl; 
     pthread_mutex_unlock(&mutex); 
     sem_post(&full); 
     sleep(1); 
    } 
} 
void *consumer(void *a) 
{ 
    while(1) { 
     sem_wait(&full); 
     pthread_mutex_lock(&mutex); 
     cout<<q.front()<<" item consumed by consumer "<<mc[pthread_self()]<<endl; 
     q.pop(); 
     pthread_mutex_unlock(&mutex); 
     sem_post(&empty); 
     sleep(1); 
    } 
} 
int main() 
{ 
    pthread_t p[5]; 
    pthread_t c[5]; 
    sem_init(&empty,0,5); 
    sem_init(&full,0,0); 
    int i; 
    pthread_mutex_lock(&mutex); 
    for(i = 0; i < 5; i++) { 
     pthread_create(&p[i],NULL,producer,NULL); 
     pthread_create(&c[i],NULL,consumer,NULL); 
     mc[c[i]] = i+1; 
     mp[p[i]] = i+1; 
    } 
    pthread_mutex_unlock(&mutex); 
    for(i = 0; i < 5; i++) { 
     pthread_join(p[i],NULL); 
     pthread_join(c[i],NULL); 
    } 
} 
+0

看看您傳遞給pthread_create的參數以及您如何在線程函數中使用該信息。考慮是否存在競爭條件。考慮傳遞'i'與傳遞'&i'的影響。 –

+0

任何理由使用操作系統特定的'pthread'而不是標準的'std :: thread'? – Christophe

+0

@Christophe:因爲我沒有在C++中學過線程,所以我使用了c版本。 –

回答

2

簡短的回答

的線程並在事實上執行與平等的機會,但他們只是打印出一個不是他們的標識符。

詳細解釋

您保持在每個線程的指針num到線程數。這是指向保存的值的指針,而不是值本身。所以所有的線程指向同一個計數器,想要找到他們自己的標識符。

每次訪問*num時,您都可以訪問的不是i在啓動線程時的值,而是其當前值。

不幸的是,在每個循環的main()中,都重用了變量i。所以最後一個循環,你會將i設置回0,並等待第一個線程加入。但是所有這些線程都會永久循環,所以循環很難有機會超出這個初始值。因此,每個線程都認爲這是數字*num+1,此時爲1。

請注意,您在註釋中指出某人創建競爭條件的方式:所有消費者和生產者線程取消引用指針,訪問受互斥鎖保護區域中的相同變量。還行吧。但是,當他們正在讀取變量時,主線程仍然很樂意將共享變量更改爲任何鎖之外。這絕對是一場種族風險。

解決方法

std::thread將允許您通過walue通過i,讓每個線程擁有的是ID自己的不變副本。

使用pthreads,您必須將指針傳遞給值。不幸的是,即使你在線程開始的時候做了一個值指向的本地拷貝,你仍然處於競爭狀態。

一個快速的解決方案,以觀察哪個線程真的在做這項工作將打印輸出結果pthread_self()(見here如何做)。或者將id存儲在int數組中,並將每個線程的地址傳遞給該數組中的唯一元素。

+0

thread_join如何改變初始值?我不明白嗎? –

+0

@ShivamMitra thread_join並沒有改變任何東西,但是你在一個以'for(i = 0;'和不幸的開頭的循環開始,它和我指向的所有num指針都是一樣的。 – Christophe

+0

任何關於如何打印哪個線程已經產生或消耗了? –