2011-05-19 137 views
10

我試圖用pthreads和信號量解決生產者 - 消費者問題,但它看起來像生產者線程不生產,並且消費者線程不消耗。看來,正在創建的線程:pthreads生產者 - 消費者問題

/* Do actual work from this point forward */ 
    /* Create the producer threads */ 
    for(c1=1; c1<=argarray[1]; c1++) 
    { 
    pthread_create(&tid, &attr, producer, NULL); 
    printf("Creating producer #%d\n", c1);  
    } 

    /* Create the consumer threads */ 
    for(c1=1; c1<=argarray[2]; c1++) 
    { 
    pthread_create(&tid, &attr, consumer, NULL); 
    printf("Creating consumer #%d\n", c1);  
    } 

因爲「創建生產者#X」和「創建消費者#X」被打印到屏幕上。然而,它不從線內自己打印:

if(insert_item(item)) 
{ 
    fprintf(stderr, "Producer error."); 
} 
else 
{ 
    printf("Producer produced %d\n", item); 
} 

對於消費者線程同樣如此。全碼:

#include "buffer.h" 
#include <pthread.h> 
#include <stdio.h> 
#include <stdlib.h> 
#include <semaphore.h> 

/* Create Buffer */ 
buffer_item buffer[BUFFER_SIZE]; 

/* Semaphore and Mutex lock */ 
sem_t cEmpty; 
sem_t cFull; 
pthread_mutex_t mutex; 

/* Threads */ 
pthread_t tid; /* Thread ID */ 
pthread_attr_t attr; /* Thread attributes */ 

void *producer(void *param); 
void *consumer(void *param); 
void init(); 

/* Progress Counter */ 
int cg; 

main(int argc, char *argv[]) 
{ 
    /* Variables */ 
    int argarray[3], c1; 

    /* Argument counter checks */ 
    if(argc != 4) 
    { 
    fprintf(stderr, "usage: main [sleep time] [# of producer threads] [# of consumer threads]\n"); 
    return -1; 
    } 

    /* Get args from command line and change them into integers */ 
    argarray[0] = atoi(argv[1]); /* How long to sleep before ending */ 
    argarray[1] = atoi(argv[2]); /* Producer threads */ 
    argarray[2] = atoi(argv[3]); /* Consumer threads */ 

    /* Error check */ 
    if(argarray[1]<1) 
    { 
    fprintf(stderr, "argument 2 must be > 0\n"); 
    return -1; 
    } 
    if(argarray[2]<1) 
    { 
    fprintf(stderr, "argument 3 must be > 0\n"); 
    return -1; 
    }  

    init(); 

    /* Do actual work from this point forward */ 
    /* Create the producer threads */ 
    for(c1=1; c1<=argarray[1]; c1++) 
    { 
    pthread_create(&tid, &attr, producer, NULL); 
    printf("Creating producer #%d\n", c1);  
    } 

    /* Create the consumer threads */ 
    for(c1=1; c1<=argarray[2]; c1++) 
    { 
    pthread_create(&tid, &attr, consumer, NULL); 
    printf("Creating consumer #%d\n", c1);  
    } 

    /* Ending it */ 
    sleep(argarray[0]); 

    printf("Production complete.\n"); 
    exit(0); 
} 

void init() 
{ 
    int c2; 

    pthread_mutex_init(&mutex, NULL); /* Initialize mutex lock */ 
    pthread_attr_init(&attr); /* Initialize pthread attributes to default */ 
    sem_init(&cFull, 0, 0); /* Initialize full semaphore */ 
    sem_init(&cEmpty, 0, BUFFER_SIZE); /* Initialize empty semaphore */ 
    cg = 0; /* Initialize global counter */ 
    for(c2=0;c2<BUFFER_SIZE;c2++) /* Initialize buffer */ 
    { 
    buffer[c2] = 0; 
    } 
} 

void *producer(void *param) 
{ 
    /* Variables */ 
    buffer_item item; 

    while(1) 
    { 
    sleep(rand());  
    item = (rand()); /* Generates random item */ 

    sem_wait(&cEmpty); /* Lock empty semaphore if not zero */ 
    pthread_mutex_lock(&mutex); 

    if(insert_item(item)) 
    { 
     fprintf(stderr, "Producer error."); 
    } 
    else 
    { 
     printf("Producer produced %d\n", item); 
    } 

    pthread_mutex_unlock(&mutex); 
    sem_post(&cFull); /* Increment semaphore for # of full */ 
    } 
} 

void *consumer(void *param) 
{ 
    buffer_item item; 

    while(1) 
    { 
    sleep(rand()); 
    sem_wait(&cFull); /* Lock empty semaphore if not zero */ 
    pthread_mutex_lock(&mutex); 
    if(remove_item(&item)) 
    { 
     fprintf(stderr, "Consumer error."); 
    } 
    else 
    { 
     printf("Consumer consumed %d\n", item); 
    } 

    pthread_mutex_unlock(&mutex); 
    sem_post(&cEmpty); /* Increments semaphore for # of empty */ 
    } 
} 

int insert_item(buffer_item item) 
{ 
    if(cg < BUFFER_SIZE) /* Buffer has space */ 
    { 
    buffer[cg] = item; 
    cg++; 
    return 0; 
    } 
    else /* Buffer full */ 
    { 
    return -1; 
    } 
} 

int remove_item(buffer_item *item) 
{ 
    if(cg > 0) /* Buffer has something in it */ 
    { 
    *item = buffer[(cg-1)]; 
    cg--; 
    return 0; 
    } 
    else /* Buffer empty */ 
    { 
    return -1; 
    } 
} 

端子輸出:

[email protected]:~/Desktop/PCthreads$ ./main 10 10 10 
Creating producer #1 
Creating producer #2 
Creating producer #3 
Creating producer #4 
Creating producer #5 
Creating producer #6 
Creating producer #7 
Creating producer #8 
Creating producer #9 
Creating producer #10 
Creating consumer #1 
Creating consumer #2 
Creating consumer #3 
Creating consumer #4 
Creating consumer #5 
Creating consumer #6 
Creating consumer #7 
Creating consumer #8 
Creating consumer #9 
Creating consumer #10 
Production complete. 

作爲一個初學者到多線程,我敢肯定,它可能是一些簡單的我俯瞰,我很欣賞的幫助。

回答

7

消費者和生產者都會執行一個睡眠(rand()),睡眠時間爲0到MAX_INT之間的隨機數,在你給主線程的例子中將會在10秒後終止。如果生產商的rand()值高於10,他們將永遠不會有機會產生任何東西。

+0

尼斯漁獲物。實際上,儘管我認爲他們已經通過調用來自多個線程的「rand」(這不是線程安全的)來調用UB,但可能同時也是如此。 – 2011-05-19 17:43:51

+0

我是否可以將我傳入的參數用於線程以解決此問題? – Mike 2011-05-19 17:48:42

+0

你可以嘗試用睡眠(1) – Rickard 2011-05-19 17:51:11

2

您不應該在主函數結束時直接調用exit。您應該先爲您在最後創建的線程調用pthread_join,以使主線程保持活動狀態,直到其他線程死亡。

+0

我會這樣做,看看它是否有效。 – Mike 2011-05-19 17:49:13

+1

..或使用一些其他方法來阻止主線程退出。加入所有這些線程意味着保持一個引用他們的原因和一個join()循環所有。大PITA。更容易等待輸入字符,或使用睡眠()循環,或什麼,比凌亂的join()的東西。 – 2013-03-06 17:38:37

3

你應該使用線程的數組開始...

pthread_t tid[argarray[1] + argarray[2]]; 

for(c1 = 0; c1 < argarray[1]; c1++) 
{ 
    pthread_create(&tid[c1], &attr, producer, NULL); 
    printf("Creating producer #%d\n", c1); 
} 
for(c1 = 0; c1 < argarray[2]; c1++) 
{ 
    pthread_create(&tid[c1 + argarray[1]], &attr, consumer, NULL); 
    printf("Creating consumer #%d\n", c1); 
} 

可能還有其他的問題,但是這是第一個我看看......

+0

如果線程ID未在應用程序中使用,則沒有必要保留它。通過在所有創建循環中使用'&tid'來有效地轉儲它是很好的。 – 2013-03-06 17:33:20

+0

它甚至可能是您可以傳遞NULL作爲pthread_t指針,但我沒有嘗試過。 – 2013-03-06 17:36:17