2012-05-28 39 views
3

我製作了一個循環緩衝區,其中有多個客戶端將不同長度的消息寫入緩衝區。服務器讀出它們。它基於代碼和消費者/生產者問題。 問題是,當緩衝區已滿並且服務器從緩衝區中移除所有數據時,客戶端將發出信號以恢復其寫入操作,但是另一客戶端(在另一個線程中)開始將其消息寫入緩衝區。我希望在緩衝區已滿之前已經寫入的客戶端恢復其操作,以便消息不會無序到達。c中的一個消費者多個生產者在完全緩衝後恢復時阻止競爭

這是我的代碼(我刪除了大量的測試代碼)

#include <stdio.h> 
#include <malloc.h> 
#include <string.h> 
#include <pthread.h> 
#include <unistd.h> 

#define BUFFER_SIZE 8 
#define NUM_THREADS 4 

struct cBuf{ 
    char *buf; 
    int  size; 
    int  start; 
    int  end; 
    pthread_mutex_t mutex; 
    pthread_cond_t buffer_full; 
    pthread_cond_t buffer_empty; 
}; 

struct cBuf cb; 


void buf_Init(struct cBuf *cb, int size) { 
    int i; 
    cb->size = size + 1; 
    cb->start = 0; 
    cb->end = 0; 
    cb->buf = (char *)calloc(cb->size, sizeof(char)); 
    for (i=0;i<size;i++) cb->buf[i]='_'; 

} 

void buf_Free(struct cBuf *cb) { 
    free(cb->buf); 
} 

int buf_IsFull(struct cBuf *cb) { 
    return (cb->end + 1) % cb->size == cb->start; 
} 

int buf_IsEmpty(struct cBuf *cb) { 
    return cb->end == cb->start; 
} 

int buf_Insert(struct cBuf *cb, char *elem) { 

    int i,j; 

    pthread_mutex_lock(&(cb->mutex)); 
    for (i=0; i < strlen(elem); ++ i){ 
     if (buf_IsFull(cb)==1) printf("\nProducer (buf_Insert) is waiting because of full buffer"); 
     while(buf_IsFull(cb)){ 
      pthread_cond_signal(&(cb->buffer_full));    
      pthread_cond_wait(&(cb->buffer_empty),&(cb->mutex)); 
     } 
     cb->buf[cb->end] = elem[i]; 
     cb->end = (cb->end + 1) % cb->size;  
     printf("%c [INPUT]",elem[i]); 
    } 

    pthread_cond_signal(&(cb->buffer_full)); 
    pthread_mutex_unlock(&(cb->mutex));  
    return 0;  
} 

int buf_Read(struct cBuf *cb, char *out) { 
    int i,j; 

    pthread_mutex_lock(&(cb->mutex)); 
    if (buf_IsEmpty(cb))printf("\nConsumer (buf_Read) is waiting because of empty buffer\n"); 
    while(buf_IsEmpty(cb)){ 
     pthread_cond_wait(&(cb->buffer_full),&(cb->mutex)); 
    } 
    for (i=0;i<BUFFER_SIZE-1;i++){ 
    printf("\n"); 
     if (cb->start == cb->end) break;   
     out[i] = cb->buf[cb->start]; 
     cb->buf[cb->start] = '_'; 
     cb->start = (cb->start + 1) % cb->size; 
     printf("%c [OUTPUT]",out[i]); 
    } 
    pthread_cond_signal(&(cb->buffer_empty)); 
    pthread_mutex_unlock(&(cb->mutex)); 
    return 0; 
} 

void * client(void *cb){ 

    pthread_detach(pthread_self()); 

    struct cBuf *myData; 
    myData = (struct cBuf*) cb; 
    char input[]="Hello World!"; 
    if (buf_Insert(myData, input)){ 
     //succes on return 0 
     printf("\n"); 
    } 

    return 0; 
} 

int main(void) { 
    char out[60]; 
    pthread_t thread; 
    int i; 
    /* Initialise conditioners*/ 
    pthread_cond_init(&(cb.buffer_full),NULL); 
    pthread_cond_init(&(cb.buffer_empty),NULL); 

    buf_Init(&cb, BUFFER_SIZE); 

    for (i = 0; i<NUM_THREADS; i++){ 
      if(pthread_create (&thread,NULL, client, (void *) &cb) !=0){ 
     } else { 

     } 
    } 

    while (1){ 
     if (buf_Read(&cb,out)){ 
     } 
    } 

    //empty the buffer; free the allocated memory 
    buf_Free(&cb); 
    return 0; 
} 
+0

在生產者和消費者之間使用一個互斥體來實現數據完整性。使用另一個只允許一個生產者一次訪問緩衝區的互斥鎖。 – Lundin

+0

@Lundin:不,使用鎖和條件變量似乎很好。部分信息的想法是問題。 –

回答

3

我已經在評論中Producer/consumer seems to be in deadlock when buffer is smaller than input from producer解釋,但那些評論,所以這裏去爲答案:

你永遠不應該隊列中有部分消息。確保你從不寫一個。

您可以在開始寫入消息之前檢查是否有足夠的空間,如果沒有,請立即等待buffer_empty,或者您可以更改隊列以將共享指針發送到分配的數據(將所有權傳遞給使用者或參考 - 計數)或其他東西,因此每條消息只佔用隊列中的一個插槽併爲剩下的分配內存。什麼是最好的取決於消息的確切性質。只要沒有部分信息,任何事情都會發生。

儘管可以記錄哪個特定的作者需要完成一條消息並喚醒它,但它會非常複雜。同步雖然很困難,但通過對其進行額外的要求,不要讓它更難。

事實上,除非這是一項家庭作業(從某種意義上說,您可以瞭解同步的工作原理),只需查找現成的消息隊列即可。在數據報模式下的SysV-IPC或unix域套接字是想到的兩個選項,或者尋找一些可用的庫。

相關問題