2014-06-19 65 views
0

我在獲取基本的雙線程安排方面有點麻煩。如何讓兩個pthread線程響應對方的等待和信號條件?

我正在一個「生產者」線程中從stdin中讀取一大塊字節,並在第二個「消費者」線程中處理這些字節,一旦這些字節可用。一旦消耗了字節,消費者線程就回到休眠狀態,生產者線程再次運行。

我正在使用pthread_cond_wait()pthread_cond_signal()讓兩個線程互相溝通數據是生成還是消耗。

下面是兩個線程的代碼:

void * produce_bytes(void *t_data) 
{ 
    pthread_data_t *d = (pthread_data_t *)t_data; 

    do { 
     pthread_mutex_lock(&d->input_lock); 
     d->n_bytes = fread(d->in_buf, sizeof(unsigned char), BUF_LENGTH_VALUE, stdin); 
     if (d->n_bytes > 0) { 
      fprintf(stdout, "PRODUCER ...signaling consumer...\n"); 
      pthread_cond_signal(&d->input_cond); 
      fprintf(stdout, "PRODUCER ...consumer signaled...\n"); 
     } 
     pthread_mutex_unlock(&d->input_lock); 
    } while (d->n_bytes > 0); 

    return NULL; 
} 

void * consume_bytes(void *t_data) 
{ 
    pthread_data_t *d = (pthread_data_t *)t_data; 

    pthread_mutex_lock(&d->input_lock); 
    while (d->n_bytes == 0) 
     pthread_cond_wait(&d->input_cond, &d->input_lock); 
    fprintf(stdout, "CONSUMER ...consuming chunk...\n"); 
    d->n_bytes = 0; 
    fprintf(stdout, "CONSUMER ...chunk consumed...\n"); 
    pthread_mutex_unlock(&d->input_lock); 
} 

pthread_data_t是我用它來跟蹤狀態的結構:

typedef struct { 
    pthread_mutex_t input_lock; 
    pthread_cond_t input_cond; 
    unsigned char in_buf[BUF_LENGTH_VALUE]; 
    size_t n_bytes; 
} pthread_data_t; 

我配置我main()函數變量;這裏是有關摘錄:

pthread_t producer_thread = NULL; 
pthread_t consumer_thread = NULL; 
pthread_data_t *thread_data = NULL; 

thread_data = malloc(sizeof(pthread_data_t)); 
thread_data->n_bytes = 0; 
pthread_mutex_init(&(thread_data->input_lock), NULL); 
pthread_cond_init(&(thread_data->input_cond), NULL); 

pthread_create(&producer_thread, NULL, produce_bytes, (void *) thread_data); 
pthread_create(&consumer_thread, NULL, consume_bytes, (void *) thread_data); 

pthread_join(producer_thread, NULL); 
pthread_join(consumer_thread, NULL); 

當我運行此,produce_bytes()信號consume_bytes()成功地在第一次循環,但在第二和後續的迭代,信號被髮送到consume_bytes(),它永遠不會被聽到,所以消費者功能永遠不會被再次運行:

PRODUCER ...signaling consumer... 
PRODUCER ...consumer signaled... 
CONSUMER ...consuming chunk... 
CONSUMER ...chunk consumed... 
PRODUCER ...signaling consumer... 
PRODUCER ...consumer signaled... 
PRODUCER ...signaling consumer... 
PRODUCER ...consumer signaled... 
PRODUCER ...signaling consumer... 
PRODUCER ...consumer signaled... 
... 

我使用的教程here至於是什麼,我試圖做的基礎。我做錯了什麼?

+2

您的消費者的*外部循環*在哪裏?你有一個給你的製片人,但你的消費者似乎只關心首次到來。你的信號機制也可以清理一下。仔細看看你的消費線程,問自己在謂詞變爲真和'while循環中斷後會發生什麼。是否有*任何*現在會再次返回到謂詞循環? – WhozCraig

+0

如果我嘗試''pthread_create'並在'produce_bytes()'函數內加入一個新的使用者線程,那麼一旦我有字節消耗,程序就會掛起。你能否添加一個答案來提出一種解決方法? –

+0

是否有意添加任意數量的消費者?它有所作爲。無論如何,我幾乎已經爲你完成了單個/單個示例。 – WhozCraig

回答

1

有與該代碼的幾個問題:

  1. produce_bytes鎖定爲阻止呼叫fread期間互斥。響應式應用程序的一般經驗法則是儘可能短地鎖定互斥鎖。您可能首先將輸入讀入臨時緩衝區,然後鎖定互斥鎖並將數據複製到線程之間共享的緩衝區。同樣適用於consume_bytes,它持有互斥鎖,同時調用fprintf可以阻止。
  2. produce_bytes in while(d->n_bytes > 0)不包含互斥體,這是一種競爭條件,因爲consume_bytes將一個新值分配給d->n_bytes。假設你想退出該循環時,fread返回0(EOF),你需要的fread的返回值複製到線程之間不能共享本地變量和使用,作爲條件while(read_bytes > 0)
  3. consume_bytes沒有任何環圍繞它,以便它在第一個條件變量通知之後返回。您可能想將其包裝到while循環中,並且只有在讀取了EOF(0字節)時才退出。
+0

謝謝,這非常有幫助。 –

+0

+1不錯的答案。關於coliru的一個例子[**可以在這裏看到**](http://coliru.stacked-crooked.com/a/6cdcdd228d000fe4)。我解決了讀取輸入緩衝區的互斥鎖問題。對不起,任何錯別字或跛腳的錯誤(缺乏睡眠的事情),但希望亞歷克斯得到的想法。 – WhozCraig

0

下面是一個工作示例,它涉及Maxim的第2點和第3點,但不是1,因爲這是響應性所必需的,但不是嚴格的正確性。

請注意,我沒有實施生產者向消費者發信號的手段,因此消費者將永遠不會退出。

#include <pthread.h> 
#include <stdio.h> 
#include <stdlib.h> 
#include <unistd.h> 

#define BUF_LENGTH_VALUE 100 
typedef struct { 
    pthread_mutex_t input_lock; 
    pthread_cond_t input_cond; 
    unsigned char in_buf[BUF_LENGTH_VALUE]; 
    size_t n_bytes; 
} pthread_data_t; 

void * produce_bytes(void *t_data) 
{ 
    pthread_data_t *d = (pthread_data_t *)t_data; 

    size_t local_byte_count = 0; 
    do { 
     pthread_mutex_lock(&d->input_lock); 
     local_byte_count = fread(d->in_buf, sizeof(unsigned char), 
       BUF_LENGTH_VALUE, stdin); 

     d->n_bytes += local_byte_count; 
     if (d->n_bytes > 0) { 
      fprintf(stdout, "PRODUCER ...signaling consumer...\n"); 
      pthread_cond_signal(&d->input_cond); 
      fprintf(stdout, "PRODUCER ...consumer signaled...\n"); 
     } 
     pthread_mutex_unlock(&d->input_lock); 

     // This is added to slow down the producer so that we can observe 
     // multiple consumptions. 
     sleep(1); 
    } while (local_byte_count > 0); 



    return NULL; 
} 

void * consume_bytes(void *t_data) 
{ 
    pthread_data_t *d = (pthread_data_t *)t_data; 
    while (1) { 
     pthread_mutex_lock(&d->input_lock); 
     while (d->n_bytes == 0) { 
      fprintf(stdout, "CONSUMER entering wait \n"); 
      pthread_cond_wait(&d->input_cond, &d->input_lock); 
     } 
     fprintf(stdout, "CONSUMER ...consuming chunk...\n"); 
     d->n_bytes = 0; 
     fprintf(stdout, "CONSUMER ...chunk consumed...\n"); 
     pthread_mutex_unlock(&d->input_lock); 
     fflush(stdout); 
    } 

} 

int main(){ 
    pthread_t producer_thread = NULL; 
    pthread_t consumer_thread = NULL; 
    pthread_data_t *thread_data = NULL; 

    thread_data = malloc(sizeof(pthread_data_t)); 
    thread_data->n_bytes = 0; 
    pthread_mutex_init(&(thread_data->input_lock), NULL); 
    pthread_cond_init(&(thread_data->input_cond), NULL); 

    pthread_create(&producer_thread, NULL, produce_bytes, (void *) thread_data); 
    pthread_create(&consumer_thread, NULL, consume_bytes, (void *) thread_data); 

    pthread_join(producer_thread, NULL); 
    pthread_join(consumer_thread, NULL); 
}