-1
我需要解決生產者 - 消費者問題,需要使用進程內的線程的單個進程解決方案。單個進程的主要例程應該設置並初始化全局內存中所需的環形緩衝區和控制變量,以便進程的所有其他線程都可以看到它們。主例程將開始創建一些生產者線程和一些消費者線程。生產者 - 使用環緩衝區和線程的消費者
我在互聯網上做了大量的研究,並能夠編寫一些代碼。然而,它似乎並沒有正確的工作,我對所有的價值觀都陷入了僵局。有人可以幫我嗎?
我的頭文件:
#include <sys/signal.h>
#include <sys/time.h>
#include <pthread.h>
#include <sys/signal.h>
#include <string.h>
#include <stdio.h>
#define NUMFLAVORS 4
#define NUMSLOTS 400
#define NUMCONSUMERS 50
#define NUMPRODUCERS 30
#define NoOfDozens 200
struct DONUT_SHOP {
int flavor [NUMFLAVORS] [NUMSLOTS];
int outptr [NUMFLAVORS];
int in_ptr [NUMFLAVORS];
int serial [NUMFLAVORS];
};
我的程序:
void *sig_waiter (void *arg);
void *producer (void *arg);
void *consumer (void *arg) ;
void sig_handler (int);
/****************************/
/* GLOBAL VARIABLES */
/****************************/
#include "ml_pc.h"
struct DONUT_SHOP shared_ring;
int space_count [NUMFLAVORS];
int donut_count [NUMFLAVORS];
int serial [NUMFLAVORS];
pthread_mutex_t prod [NUMFLAVORS];
pthread_mutex_t cons [NUMFLAVORS];
pthread_cond_t prod_cond [NUMFLAVORS];
pthread_cond_t cons_cond [NUMFLAVORS];
pthread_t thread_id [NUMCONSUMERS+1], sig_wait_id;
int main (int argc, char *argv[])
{
int i, j, k, nsigs;
struct timeval randtime, first_time, last_time;
struct sigaction new_act;
int arg_array[NUMCONSUMERS];
sigset_t all_signals;
int sigs[] = { SIGBUS, SIGSEGV, SIGFPE };
pthread_attr_t thread_attr;
struct sched_param sched_struct;
/**********************************************************************/
/* INITIAL TIMESTAMP VALUE FOR PERFORMANCE MEASURE */
/**********************************************************************/
gettimeofday (&first_time, (struct timezone *) 0);
for (i = 0; i < NUMCONSUMERS + 1 ; i++) {
arg_array [i] = i+1; /** SET ARRAY OF ARGUMENT VALUES **/
}
/**********************************************************************/
/* GENERAL PTHREAD MUTEX AND CONDITION INIT AND GLOBAL INIT */
/**********************************************************************/
for (i = 0; i < NUMFLAVORS; i++) {
pthread_mutex_init (&prod [i], NULL);
pthread_mutex_init (&cons [i], NULL);
pthread_cond_init (&prod_cond [i], NULL);
pthread_cond_init (&cons_cond [i], NULL);
shared_ring.outptr [i] = 0;
shared_ring.in_ptr [i] = 0;
shared_ring.serial [i] = 0;
space_count [i] = NUMSLOTS;
donut_count [i] = 0;
}
/**********************************************************************/
/* SETUP FOR MANAGING THE SIGTERM SIGNAL, BLOCK ALL SIGNALS */
/**********************************************************************/
sigfillset (&all_signals);
nsigs = sizeof (sigs)/sizeof (int);
for (i = 0; i < nsigs; i++){
sigdelset (&all_signals, sigs [i]);
}
sigprocmask (SIG_BLOCK, &all_signals, NULL);
sigfillset (&all_signals);
for(i = 0; i < nsigs; i++) {
new_act.sa_handler = sig_handler;
new_act.sa_mask = all_signals;
new_act.sa_flags = 0;
if (sigaction (sigs[i], &new_act, NULL) == -1){
perror("can't set signals: ");
exit(1);
}
}
printf ("just before threads created\n");
/*********************************************************************/
/* CREATE SIGNAL HANDLER THREAD, PRODUCER AND CONSUMERS */
/*********************************************************************/
if (pthread_create (&sig_wait_id, NULL,
sig_waiter, NULL) != 0){
printf ("pthread_create failed ");
exit (3);
}
pthread_attr_init (&thread_attr);
pthread_attr_setinheritsched (&thread_attr,
PTHREAD_INHERIT_SCHED);
#ifdef GLOBAL
sched_struct.sched_priority = sched_get_priority_max(SCHED_OTHER);
pthread_attr_setinheritsched (&thread_attr,
PTHREAD_EXPLICIT_SCHED);
pthread_attr_setschedpolicy (&thread_attr, SCHED_OTHER);
pthread_attr_setschedparam (&thread_attr, &sched_struct);
pthread_attr_setscope (&thread_attr,
PTHREAD_SCOPE_SYSTEM);
#endif
if (pthread_create (&thread_id[0], &thread_attr,
producer, NULL) != 0) {
printf ("pthread_create failed ");
exit (3);
}
for (i = NUMPRODUCERS; i < NUMCONSUMERS + 1; i++) {
if (pthread_create (&thread_id [i], &thread_attr,
consumer, (void *)&arg_array [i]) != 0){
printf ("pthread_create failed");
exit (3);
}
}
printf ("just after threads created\n");
/*********************************************************************/
/* WAIT FOR ALL CONSUMERS TO FINISH, SIGNAL WAITER WILL */
/* NOT FINISH UNLESS A SIGTERM ARRIVES AND WILL THEN EXIT */
/* THE ENTIRE PROCESS....OTHERWISE MAIN THREAD WILL EXIT */
/* THE PROCESS WHEN ALL CONSUMERS ARE FINISHED */
/*********************************************************************/
for (i = 1; i < NUMCONSUMERS + 1; i++) {
pthread_join (thread_id [i], NULL);
}
/*****************************************************************/
/* GET FINAL TIMESTAMP, CALCULATE ELAPSED SEC AND USEC */
/*****************************************************************/
gettimeofday (&last_time, (struct timezone *) 0);
if ((i = last_time.tv_sec - first_time.tv_sec) == 0)
j = last_time.tv_usec - first_time.tv_usec;
else{
if (last_time.tv_usec - first_time.tv_usec < 0) {
i--;
j = 1000000 +
(last_time.tv_usec - first_time.tv_usec);
} else {
j = last_time.tv_usec - first_time.tv_usec; }
}
printf ("Elapsed consumer time is %d sec and %d usec\n", i, j);
printf ("\n\n ALL CONSUMERS FINISHED, KILLING PROCESS\n\n");
exit (0);
}
/*********************************************/
/* INITIAL PART OF PRODUCER..... */
/*********************************************/
void *producer (void *arg)
{
int i, j, k;
int rand_donuts;
unsigned short xsub1 [3];
struct timeval randtime;
int threadid = pthread_self ();
FILE *prod_file ;
char prod_id[10];
char prod_filename[20]="prod_";
gettimeofday (&randtime, (struct timezone *) 0);
xsub1 [0] = (unsigned short )randtime.tv_usec;
xsub1 [1] = (unsigned short ) (randtime.tv_usec >> 16);
xsub1 [2] = (unsigned short ) (pthread_self);
while (1)
{
rand_donuts = nrand48 (xsub1) & 3;
pthread_mutex_lock (&prod [rand_donuts]);
/** check if there is space available to produce donuts **/
while (space_count [rand_donuts] == 0)
{
pthread_cond_wait (&prod_cond [rand_donuts], &prod [rand_donuts]);
}
serial[rand_donuts] ++;
fprintf(prod_file,"thread %d is producing donuts of type :%d , serial number of donut: %d \n",threadid,rand_donuts,serial[rand_donuts]);
shared_ring.flavor[rand_donuts][shared_ring.in_ptr[rand_donuts]]=serial[rand_donuts];
/** Increment in_ptr modulo NUMSLOTS **/
shared_ring.in_ptr[rand_donuts] = (shared_ring.in_ptr[rand_donuts]+1) % NUMSLOTS ;
/** Decrement the space count **/
space_count [rand_donuts] --;
pthread_mutex_unlock (&prod [rand_donuts]);
/* get cons mutex */
/* inc donut count */
/* unlock cons mutex */
/* signal cons_condx_var */
pthread_mutex_lock(&cons [rand_donuts]);
donut_count [rand_donuts]++;
pthread_mutex_unlock(&cons [rand_donuts]);
pthread_cond_signal(&cons_cond[rand_donuts]);
usleep(10000);
}
fclose(prod_file);
return NULL;
}
/*********************************************/
/* Consumer Code ,starts here */
/*********************************************/
void *consumer (void *arg)
{
int i, j, k, m, id ,rand_donuts,outptr,numofdonut;
unsigned short xsub [3];
struct timeval randtime;
int printresult;
int serial [NUMFLAVORS];
int collection [NUMFLAVORS][12];
FILE *cons_file;
char fileid[10];
char cons_filename[10]="cons_";
time_t current_time;
id = *(int *) arg;
sprintf(fileid, "%d", id) ;
strcat(cons_filename,fileid);
gettimeofday (&randtime, (struct timezone *) 0);
xsub [0] = (unsigned short)randtime.tv_usec;
xsub [1] = (unsigned short) (randtime.tv_usec >> 16);
xsub [2] = (unsigned short) (pthread_self);
if ((cons_file = fopen(cons_filename, "a+")) == NULL)
fprintf(stderr, "Cannot open %s\n", "cons_file");
for(i = 0; i < 10; i++)
{
for(k = 0; k < NUMFLAVORS; k++)
{
for(m = 0; m < 12; m++)
{
collection[k][m] = -1;
}
}
for(k = 0; k < NUMFLAVORS; k++)
{
serial [k] = 0;
}
for(m = 0; m < 12; m++)
{
rand_donuts = nrand48(xsub) & 3;
/*******************************************************
get cons mutex
check donut count
loop:
if donut count == 0
wait cons_condx_var
*******************************************************/
pthread_mutex_lock (&cons [rand_donuts]);
while (donut_count [rand_donuts] == 0)
{
pthread_cond_wait (&cons_cond [rand_donuts], &cons [rand_donuts]);
}
/************************************************
Consume donut
************************************************/
outptr = shared_ring.outptr[rand_donuts];
numofdonut = shared_ring.flavor[rand_donuts][outptr];
collection[rand_donuts][serial[rand_donuts]]=numofdonut ;
serial[rand_donuts] ++;
outptr ++;
shared_ring.outptr[rand_donuts] = outptr % NUMSLOTS;
donut_count [rand_donuts] --;
pthread_mutex_unlock (&cons [rand_donuts]);
/* lock producer mutex */
pthread_mutex_lock(&prod [rand_donuts]);
/*increment the space count*/
space_count [rand_donuts] ++;
/* unlock producer mutex */
pthread_mutex_unlock(&prod [rand_donuts]);
//signal the producer
pthread_cond_signal(&prod_cond[rand_donuts]);
}
current_time=time(NULL);
fprintf(cons_file,"Consumer %d finished collecting dozen#: %d \n",id,i);
fprintf(cons_file,"Consumer thread#: %d , time: %s, Dozen#: %d",id,asctime(localtime(¤t_time)),i);
fprintf(cons_file,"\t plain \t jelly \t coconut \t honey-dip \t");
fprintf(cons_file,"\n");
printresult=0;
for (i = 1; i < NoOfDozens; i++)
{
for(m = 0; m < 12 && printresult <=3 ; m++)
{
printresult = 0;
for(k = 0; k < NUMFLAVORS ; k++)
{
if(collection[k][m] >= 0)
{
fprintf(cons_file,"\t %d \t",collection[k][m]);
printresult = 0;
}
else
{
fprintf(cons_file,"\t \t");
printresult =printresult+1;
}
}
fprintf(cons_file,"\n");
}
}
/*Sleeping*/
fflush(cons_file);
usleep(1000); /* sleep 1 ms */
}
fprintf(cons_file,"Consumer %d finished collecting all 10 dozen donuts \n",id);
fclose(cons_file);
return NULL;
}
/***********************************************************/
/* PTHREAD ASYNCH SIGNAL HANDLER ROUTINE... */
/***********************************************************/
void *sig_waiter (void *arg)
{
sigset_t sigterm_signal;
int signo;
sigemptyset (&sigterm_signal);
sigaddset (&sigterm_signal, SIGTERM);
sigaddset (&sigterm_signal, SIGINT);
if (sigwait (&sigterm_signal, & signo) != 0) {
printf ("\n sigwait () failed, exiting \n");
exit(2);
}
printf ("process going down on SIGNAL (number %d)\n\n", signo);
exit (1);
return NULL;
}
/**********************************************************/
/* PTHREAD SYNCH SIGNAL HANDLER ROUTINE... */
/**********************************************************/
void sig_handler (int sig)
{
pthread_t signaled_thread_id;
int i, thread_index;
signaled_thread_id = pthread_self ();
for (i = 0; i < (NUMCONSUMERS + NUMPRODUCERS); i++) {
if (signaled_thread_id == thread_id [i]) {
thread_index = i;
break;
}
}
printf ("\nThread %d took signal # %d, PROCESS HALT\n",
thread_index, sig);
exit (1);
}
是否有人可以告訴我,我要去哪裏錯了,我需要什麼樣的變化,使?提前致謝!!
「不過,這似乎並沒有工作權」 - 這不是一個有效的問題。 –
好的。我似乎陷入了一切僵局。對於我保留的每一個隊列深度,我都會遇到所有循環的死鎖。 – user2830418
C++代碼在哪裏? (你是否真的希望有人能夠穿過460行代碼?) – WhozCraig