我在C中編寫了一個通用隊列,用於各種有效載荷類型。它是一個阻塞隊列,消費者線程將阻止等待隊列由生產者線程填充。sem_wait上的線程阻塞導致其他線程掛起
我已經使用check
隔離測試了隊列代碼,包括線程阻塞等待將值添加到隊列中的行爲。所有這些測試都通過了,但是,當將隊列集成到代碼的其餘部分時,我遇到了第一次線程試圖阻塞隊列時所有其他線程掛起的情況。
具體而言,我所整合的程序是一個更大的生態系統的成員,所以有一個啓動腳本來初始化程序,然後進行守護。守護程序線程然後創建幾個分離的線程來執行各種功能。其中一個線程調用sem_wait
並掛起所有線程,包括產生守護進程的線程。
爲了確認此次調用是否是問題,我使用調試程序以非後臺程序模式運行程序,該調試程序確認sem_wait
掛起。在產生等待隊列的線程之前,我還添加了sleep
。在這種情況下,其他線程進一步前進,然後在sem_wait
調用時掛起。
有問題的隊列只對這一個程序可見。其引用存儲爲全局變量。當執行sem_wait
的呼叫時,隊列肯定是空的。
以下是隊列代碼:
//Queue.h
#include <pthread.h>
#include <semaphore.h>
typedef void (*freeFunction)(void *);
typedef struct _queueNode {
void *data;
struct _queueNode *next;
} queueNode;
typedef struct queue {
sem_t *logicalLength;
size_t elementSize;
queueNode *head;
queueNode *tail;
freeFunction freeFn;
pthread_mutex_t *queueLock;
} queue_t;
void queue_initialize(queue_t *queue, size_t elementSize, freeFunction freeFn);
void queue_destroy(queue_t *queue); // Removes all elements from the queue
int queue_size(queue_t *queue); // Returns the number of elements in the queue
void queue_add(queue_t *queue, void *element); // Adds to tail
int queue_take(queue_t *queue, void *elementBuffer); // Returns/removes head, blocks if empty
//Queue.c
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <time.h>
#include "Queue.h"
void queue_initialize(queue_t *queue, size_t elementSize, freeFunction freeFn) {
assert(elementSize > 0);
assert(queue != NULL);
queue->elementSize = elementSize;
queue->head = NULL;
queue->tail = NULL;
queue->freeFn = freeFn;
queue->logicalLength = calloc(1, sizeof(sem_t));
queue->queueLock = calloc(1, sizeof(pthread_mutex_t));
sem_init(queue->logicalLength, 0, 0);
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(queue->queueLock, &attr);
}
void queue_destroy(queue_t *queue) {
assert(queue != NULL);
queueNode *current;
while(queue->head != NULL) {
current = queue->head;
queue->head = current->next;
if(queue->freeFn != NULL) {
queue->freeFn(current->data);
}
free(current->data);
free(current);
}
queue->head = NULL;
queue->tail = NULL;
pthread_mutex_destroy(queue->queueLock);
sem_destroy(queue->logicalLength);
free(queue->queueLock);
free(queue->logicalLength);
}
void queue_add(queue_t *queue, void *element) {
assert(queue != NULL);
assert(element != NULL);
pthread_mutex_lock(queue->queueLock);
queueNode *node = calloc(1, sizeof(queueNode));
node->data = calloc(1, queue->elementSize);
node->next = NULL;
memcpy(node->data, element, queue->elementSize);
if(queue->head == NULL) {
queue->head = queue->tail = node;
} else {
queue->tail->next = node;
queue->tail = node;
}
sem_post(queue->logicalLength);
pthread_mutex_unlock(queue->queueLock);
}
void queue_removeNode(queue_t *queue, void *elementBuffer) {
pthread_mutex_lock(queue->queueLock);
if(queue->head == NULL) {
pthread_mutex_unlock(queue->queueLock);
return;
}
queueNode *node = queue->head;
memcpy(elementBuffer, node->data, queue->elementSize);
if(queue->head == queue->tail)
queue->tail = NULL;
queue->head = node->next;
if(queue->freeFn) {
queue->freeFn(node->data);
}
free(node->data);
free(node);
pthread_mutex_unlock(queue->queueLock);
}
int queue_take(queue_t *queue, void *elementBuffer) {
assert(queue != NULL);
assert(elementBuffer != NULL);
int result = EXIT_SUCCESS;
sem_wait(queue->logicalLength);
queue_removeNode(queue, elementBuffer);
return result;
}
下面是其顯示該問題的代碼:
//fei.h
...
#include "Queue.h"
extern queue_t *commandQueue;
...
//fei.c
#include "fei.h"
#include "commandHandler.h"
#include "Queue.h"
queue_t *commandQueue;
int main (int argc, char **argv){
int debugFlag = handleOpts(argc, argv);
if(!debugFlag){
int rc = daemonize();
if(rc != 0){
exit(rc);
}
}
rc = setConfigValues();
if(rc){
exit(rc);
}
queue_t *commandQueue = calloc(1, sizeof(queue_t));
queue_initialize(commandQueue, sizeof(commandPack_t), commandFree);
if(getPortIsock() == 0){ // This is a simple config value
exit(EXIT_FAILURE);
}
signal(SIGPIPE, SIG_IGN);
pthread_t id;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&id, &attr, receiveCommands, NULL);
pthread_create(&id, &attr, processCommands, NULL);
if(!setSocketIsock()){
exit(1);
}
while(!checkIfConnectedToSct())
usleep(50000);
receiveCCSDSPackets();
exit (0);
}
// commandHandler.c
#include "Queue.h"
#include "fei.h"
#include "commandHandler.h"
queue_t *commandQueue;
void *receiveCommands(){
getNewCsockConnection();
connectedToSct = 1;
while(1){
commandPack_t cmd;
int validCommand = getCommand(CSOCKET, &cmd);
if(validCommand == RECEIVE_SUCCESS){
queue_add(commandQueue, &cmd);
} else{
usleep(5000);
}
}
return NULL;
}
void *processCommands(){
while(1){
commandPack_t cmdToProcess;
/* Blocking queue */
queue_take(commandQueue, &cmdToProcess);
switch(cmdToProcess.command){
// Command processing
}
commandFree(&cmdToProcess);
}
return NULL;
}
的receiveCommands
功能是生產者線程和processCommands
功能消費者線程。這些是代碼庫中唯一指向commandQueue
的地方。雖然它是可變的,但主線程的執行很少超出setSocketIsock()
條件檢查。
任何洞察力是讚賞。
在queue_add(),這是通常的semaphre單位解除鎖定,而不是在它裏面之後發佈,在奧得河,以防止取線程立即成爲運行,擊中鎖定然後再次停止,直到添加線程開始釋放鎖定。這不是導致你的鎖定,但:( – 2015-02-09 20:24:06
謝謝。這是一個很好的觀點。我也知道,我應該檢查'sem_wait'的輸出,如果它沒有成功阻塞。太糟糕了,我的問題是,它是阻止TOO成功 – nmogk 2015-02-09 21:20:18