2015-02-09 104 views
3

我在C中編寫了一個通用隊列,用於各種有效載荷類型。它是一個阻塞隊列,消費者線程將阻止等待隊列由生產者線程填充。sem_wait上的線程阻塞導致其他線程掛起

我已經使用check隔離測試了隊列代碼,包括線程阻塞等待將值添加到隊列中的行爲。所有這些測試都通過了,但是,當將隊列集成到代碼的其餘部分時,我遇到了第一次線程試圖阻塞隊列時所有其他線程掛起的情況。

具體而言,我所整合的程序是一個更大的生態系統的成員,所以有一個啓動腳本來初始化程序,然後進行守護。守護程序線程然後創建幾個分離的線程來執行各種功能。其中一個線程調用sem_wait並掛起所有線程,包括產生守護進程的線程。

爲了確認此次調用是否是問題,我使用調試程序以非後臺程序模式運行程序,該調試程序確認sem_wait掛起。在產生等待隊列的線程之前,我還添加了sleep。在這種情況下,其他線程進一步前進,然後在sem_wait調用時掛起。

有問題的隊列只對這一個程序可見。其引用存儲爲全局變量。當執行sem_wait的呼叫時,隊列肯定是空的。

以下是隊列代碼:

//Queue.h 
#include <pthread.h> 
#include <semaphore.h> 

typedef void (*freeFunction)(void *); 

typedef struct _queueNode { 
    void *data; 
    struct _queueNode *next; 
} queueNode; 


typedef struct queue { 
    sem_t *logicalLength; 
    size_t elementSize; 

    queueNode *head; 
    queueNode *tail; 

    freeFunction freeFn; 
    pthread_mutex_t *queueLock; 
} queue_t; 

void queue_initialize(queue_t *queue, size_t elementSize, freeFunction freeFn); 
void queue_destroy(queue_t *queue); // Removes all elements from the queue 

int queue_size(queue_t *queue); // Returns the number of elements in the queue 

void queue_add(queue_t *queue, void *element);   // Adds to tail 
int queue_take(queue_t *queue, void *elementBuffer); // Returns/removes head, blocks if empty 


//Queue.c 
#include <stdlib.h> 
#include <string.h> 
#include <assert.h> 
#include <time.h> 

#include "Queue.h" 

void queue_initialize(queue_t *queue, size_t elementSize, freeFunction freeFn) { 

    assert(elementSize > 0); 
    assert(queue != NULL); 

    queue->elementSize = elementSize; 

    queue->head = NULL; 
    queue->tail = NULL; 

    queue->freeFn = freeFn; 

    queue->logicalLength = calloc(1, sizeof(sem_t)); 
    queue->queueLock = calloc(1, sizeof(pthread_mutex_t)); 

    sem_init(queue->logicalLength, 0, 0); 

    pthread_mutexattr_t attr; 
    pthread_mutexattr_init(&attr); 
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); 
    pthread_mutex_init(queue->queueLock, &attr); 

} 

void queue_destroy(queue_t *queue) { 

    assert(queue != NULL); 

    queueNode *current; 

    while(queue->head != NULL) { 

     current = queue->head; 
     queue->head = current->next; 

     if(queue->freeFn != NULL) { 

      queue->freeFn(current->data); 

     } 

     free(current->data); 
     free(current); 

    } 

    queue->head = NULL; 
    queue->tail = NULL; 

    pthread_mutex_destroy(queue->queueLock); 
    sem_destroy(queue->logicalLength); 

    free(queue->queueLock); 
    free(queue->logicalLength); 

} 

void queue_add(queue_t *queue, void *element) { 

    assert(queue != NULL); 
    assert(element != NULL); 

    pthread_mutex_lock(queue->queueLock); 

     queueNode *node = calloc(1, sizeof(queueNode)); 
     node->data = calloc(1, queue->elementSize); 

     node->next = NULL; 

     memcpy(node->data, element, queue->elementSize); 

     if(queue->head == NULL) { 

      queue->head = queue->tail = node; 

     } else { 

      queue->tail->next = node; 
      queue->tail = node; 

     } 

     sem_post(queue->logicalLength); 

    pthread_mutex_unlock(queue->queueLock); 

} 

void queue_removeNode(queue_t *queue, void *elementBuffer) { 

    pthread_mutex_lock(queue->queueLock); 

     if(queue->head == NULL) { 

      pthread_mutex_unlock(queue->queueLock); 
      return; 
     } 

     queueNode *node = queue->head; 
     memcpy(elementBuffer, node->data, queue->elementSize); 

     if(queue->head == queue->tail) 
      queue->tail = NULL; 

     queue->head = node->next; 

     if(queue->freeFn) { 

      queue->freeFn(node->data); 
     } 

     free(node->data); 
     free(node); 

    pthread_mutex_unlock(queue->queueLock); 

} 

int queue_take(queue_t *queue, void *elementBuffer) { 

    assert(queue != NULL); 
    assert(elementBuffer != NULL); 

    int result = EXIT_SUCCESS; 

    sem_wait(queue->logicalLength); 

    queue_removeNode(queue, elementBuffer); 

    return result; 
} 

下面是其顯示該問題的代碼:

//fei.h 
... 
#include "Queue.h" 
extern queue_t *commandQueue; 
... 

//fei.c 
#include "fei.h" 
#include "commandHandler.h" 
#include "Queue.h" 

queue_t *commandQueue; 

int main (int argc, char **argv){ 

    int debugFlag = handleOpts(argc, argv); 

    if(!debugFlag){ 
     int rc = daemonize(); 
     if(rc != 0){ 
      exit(rc); 
     } 
    } 

    rc = setConfigValues(); 
    if(rc){ 
     exit(rc); 
    } 

    queue_t *commandQueue = calloc(1, sizeof(queue_t)); 
    queue_initialize(commandQueue, sizeof(commandPack_t), commandFree); 

    if(getPortIsock() == 0){ // This is a simple config value 
     exit(EXIT_FAILURE); 
    } 

    signal(SIGPIPE, SIG_IGN); 

    pthread_t id; 
    pthread_attr_t attr; 
    pthread_attr_init(&attr); 
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); 
    pthread_create(&id, &attr, receiveCommands, NULL); 
    pthread_create(&id, &attr, processCommands, NULL); 

    if(!setSocketIsock()){ 
     exit(1); 
    } 
    while(!checkIfConnectedToSct()) 
     usleep(50000); 

    receiveCCSDSPackets(); 
    exit (0); 
} 

// commandHandler.c 
#include "Queue.h" 
#include "fei.h" 
#include "commandHandler.h" 

queue_t *commandQueue; 

void *receiveCommands(){ 

    getNewCsockConnection(); 
    connectedToSct = 1; 

    while(1){ 
     commandPack_t cmd; 
     int validCommand = getCommand(CSOCKET, &cmd); 
     if(validCommand == RECEIVE_SUCCESS){ 

     queue_add(commandQueue, &cmd); 

     } else{ 
      usleep(5000); 
     } 
    } 
    return NULL; 
} 

void *processCommands(){ 
    while(1){ 
     commandPack_t cmdToProcess; 

     /* Blocking queue */ 
     queue_take(commandQueue, &cmdToProcess); 


     switch(cmdToProcess.command){ 
      // Command processing 
     } 

     commandFree(&cmdToProcess); 
    } 
    return NULL; 
} 

receiveCommands功能是生產者線程和processCommands功能消費者線程。這些是代碼庫中唯一指向commandQueue的地方。雖然它是可變的,但主線程的執行很少超出setSocketIsock()條件檢查。

任何洞察力是讚賞。

+0

在queue_add(),這是通常的semaphre單位解除鎖定,而不是在它裏面之後發佈,在奧得河,以防止取線程立即成爲運行,擊中鎖定然後再次停止,直到添加線程開始釋放鎖定。這不是導致你的鎖定,但:( – 2015-02-09 20:24:06

+0

謝謝。這是一個很好的觀點。我也知道,我應該檢查'sem_wait'的輸出,如果它沒有成功阻塞。太糟糕了,我的問題是,它是阻止TOO成功 – nmogk 2015-02-09 21:20:18

回答

1

main(),你有這樣一行:

queue_t *commandQueue = calloc(1, sizeof(queue_t)); 

這使得commandQueuemain一個局部變量。您的其他函數使用一個也稱爲commandQueue的全局變量。這使我認爲你不打算在main中重新聲明commandQueue。因此,改變上述行來此:

commandQueue = calloc(1, sizeof(queue_t)); 
+0

Ah!Well spotted! – 2015-02-10 11:44:42

+0

就是這樣,簡單而陰險,謝謝你的幫助。 – nmogk 2015-02-10 16:43:19