2012-08-31 469 views
1

我想實現生產者 - 消費者場景,其中有一個生產者線程和多個消費者線程。 實際上讓我明白,生產者線程需要定期創建一組對象(例如5秒),消費者線程需要消耗這些對象。同步1個生產者和多個消費者(多線程)

我不確定如何以固定間隔創建一組對象以及如何合併多個使用者。

在此先感謝

回答

2

究竟是什麼,你有問題?傳統上,工作隊列受條件鎖保護;如果多個「消費者」在同一個鎖上等待,那麼只有其中一個會成功實際獲得它。但是,不應抓取整個隊列,而應該從中抓取一個對象,將其從隊列中移除,然後在處理作業之前釋放鎖。

至於定期創建作業,這是您正在使用的任何庫中定時器類的工作。如果您正在等待輸入,則select()和poll()調用會有一個超時值;如果你沒有做任何事情,你可以只是叫[睡眠]()。

+0

+1的答案,不給解決,因爲這是最肯定的暗示計數 - 信標信號的功課 – AndyG

1

我不會說出所有的代碼,但是你可以爲這個(和一個鏈表)使用一個互斥和一個條件變量。其基本模式是製作者:

loop_forever 
    wait_until_interval_has_elapsed 
    lock_the_mutex 
     append_an_item_to_the_list 
     signal_the_condvar // can be outside the mutex 
    unlock_the_mutex 

每個消費者的作用:

loop_forever 
    wait_until_interval_has_elapsed 
    lock_the_mutex 
     append_an_item_to_the_list 
    unlock_the_mutex 
    post_the_semaphore 

loop_forever 
    wait_the_semaphore 
    lock_the_mutex 
     remove_an_item_from_the_list 
    unlock_the_mutex 
    process_the_item 

這是稍微簡單:

loop_forever 
    lock_the_mutex 
     while(list_is_empty) 
      wait_the_condvar 
     remove_an_item_from_the_list 
    unlock_the_mutex 
    process_the_item 

你也可以用一個互斥量和信號量做但是一旦你決定添加一種機制來告訴消費者完成他們正在做的事情並退出,那麼使用條件變量可能會更好,因爲你可以「廣播」它。信號量很容易被誤用,因此C++ 11沒有信號量。 Posix的確如此,所以在linux上你有選擇。公平的說,其他多線程操作系統也是如此。

在線程具有不同優先級的情況下,互斥/ condvar對可能會給出比信號量更好的行爲。與互斥量不同,信號量沒有「所有者」,因此像優先級繼承這樣的技術是不可能的。

對於wait_until_interval_has_elapsed使用定時器或睡眠功能 - 請注意,除非您使用的是實時系統,否則您永遠無法確定您能夠在特定時間運行,只有您會被喚醒或在特定時間之後。

+0

+1可用小部件的數量。 –

1

您將需要一種方法來告訴消費者線程有新的對象可用。這裏有一個例子(我沒有包括#include S):

#define NUM_CONSUMERS 2 

static int objects[4]; //The place for produced objects 
static pthread_mutex_t cond_mut = PTHREAD_MUTEX_INITIALIZER; 
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; 

void* producer_func(void* arg); 
void* consumer_func(void* arg); 

int main(int argc, char** argv) { 
    pthread_t producer; 
    pthread_t consumers[NUM_CONSUMERS]; 
    int res; 
    void* zero = 0; 
    res = pthread_create(&producer, NULL, producer_func, zero); 
    for(int i = 0; i < NUM_CONSUMERS; i++) { 
    res = pthread_create(&consumers[i], NULL, consumer_func, zero); 
    } 
} 

void* producer(void* arg) { 
    while(1) { 
    objects[0] = 0; 
    objects[1] = 1; 
    objects[2] = 2; 
    objects[3] = 3; 
    pthread_mutex_lock(&cond_mut); 
    pthread_cond_signal(&cond); 
    pthread_mutex_unlock(&cond_mut); 
    sleep(5); 
    } 
} 

void* consumer(void* arg) { 
    while(1) { 
    pthread_mutex_lock(&cond_mut); 
    pthread_cond_wait(&cond); 
    pthread_mutex_unlock(&cond_mut); 
    //Process objects here 
    } 
} 
相關問題