2013-10-17 60 views
-1

我需要解決生產者 - 消費者問題,需要使用進程內的線程的單個進程解決方案。單個進程的主要例程應該設置並初始化全局內存中所需的環形緩衝區和控制變量,以便進程的所有其他線程都可以看到它們。主例程將開始創建一些生產者線程和一些消費者線程。生產者 - 使用環緩衝區和線程的消費者

我在互聯網上做了大量的研究,並能夠編寫一些代碼。然而,它似乎並沒有正確的工作,我對所有的價值觀都陷入了僵局。有人可以幫我嗎?

我的頭文件:

#include <sys/signal.h> 
#include <sys/time.h> 
#include <pthread.h> 
#include <sys/signal.h> 
#include <string.h> 
#include <stdio.h> 

#define   NUMFLAVORS   4 
#define   NUMSLOTS   400 
#define   NUMCONSUMERS  50 
#define  NUMPRODUCERS  30 
#define  NoOfDozens  200 

struct DONUT_SHOP { 
    int   flavor [NUMFLAVORS] [NUMSLOTS]; 
    int   outptr [NUMFLAVORS]; 
    int  in_ptr [NUMFLAVORS]; 
    int  serial [NUMFLAVORS]; 
}; 

我的程序:

void *sig_waiter (void *arg); 
    void *producer (void *arg); 
    void *consumer (void *arg) ; 
    void sig_handler (int); 



/****************************/ 
/*  GLOBAL VARIABLES */ 
/****************************/ 

#include "ml_pc.h" 

    struct DONUT_SHOP shared_ring; 
    int   space_count [NUMFLAVORS];  
    int   donut_count [NUMFLAVORS];   
    int   serial [NUMFLAVORS];    
    pthread_mutex_t  prod [NUMFLAVORS]; 
    pthread_mutex_t  cons [NUMFLAVORS]; 
    pthread_cond_t  prod_cond [NUMFLAVORS]; 
    pthread_cond_t  cons_cond [NUMFLAVORS]; 
    pthread_t  thread_id [NUMCONSUMERS+1], sig_wait_id; 


int main (int argc, char *argv[]) 
{ 
    int   i, j, k, nsigs; 
    struct timeval  randtime, first_time, last_time; 
    struct sigaction new_act; 
    int   arg_array[NUMCONSUMERS]; 
    sigset_t  all_signals; 
    int sigs[]  = { SIGBUS, SIGSEGV, SIGFPE }; 
    pthread_attr_t  thread_attr; 
    struct sched_param sched_struct; 






/**********************************************************************/ 
/* INITIAL TIMESTAMP VALUE FOR PERFORMANCE MEASURE     */ 
/**********************************************************************/ 

     gettimeofday (&first_time, (struct timezone *) 0); 


     for (i = 0; i < NUMCONSUMERS + 1 ; i++) { 
      arg_array [i] = i+1; /** SET ARRAY OF ARGUMENT VALUES **/ 
     } 

/**********************************************************************/ 
/* GENERAL PTHREAD MUTEX AND CONDITION INIT AND GLOBAL INIT   */ 
/**********************************************************************/ 

    for (i = 0; i < NUMFLAVORS; i++) { 
     pthread_mutex_init (&prod [i], NULL); 
     pthread_mutex_init (&cons [i], NULL); 
     pthread_cond_init (&prod_cond [i], NULL); 
     pthread_cond_init (&cons_cond [i], NULL); 
     shared_ring.outptr [i]  = 0; 
     shared_ring.in_ptr [i]  = 0; 
     shared_ring.serial [i]  = 0; 
     space_count [i]   = NUMSLOTS; 
     donut_count [i]   = 0; 
    } 



/**********************************************************************/ 
/* SETUP FOR MANAGING THE SIGTERM SIGNAL, BLOCK ALL SIGNALS   */ 
/**********************************************************************/ 

    sigfillset (&all_signals); 
    nsigs = sizeof (sigs)/sizeof (int); 
    for (i = 0; i < nsigs; i++){ 
      sigdelset (&all_signals, sigs [i]); 
     } 
    sigprocmask (SIG_BLOCK, &all_signals, NULL); 
    sigfillset (&all_signals); 
    for(i = 0; i < nsigs; i++) { 
      new_act.sa_handler = sig_handler; 
      new_act.sa_mask  = all_signals; 
      new_act.sa_flags = 0; 
      if (sigaction (sigs[i], &new_act, NULL) == -1){ 
        perror("can't set signals: "); 
        exit(1); 
      } 
    } 

    printf ("just before threads created\n"); 


/*********************************************************************/ 
/* CREATE SIGNAL HANDLER THREAD, PRODUCER AND CONSUMERS    */ 
/*********************************************************************/ 

     if (pthread_create (&sig_wait_id, NULL, 
        sig_waiter, NULL) != 0){ 

       printf ("pthread_create failed "); 
       exit (3); 
     } 

     pthread_attr_init (&thread_attr); 
     pthread_attr_setinheritsched (&thread_attr, 
     PTHREAD_INHERIT_SCHED); 

     #ifdef GLOBAL 

     sched_struct.sched_priority = sched_get_priority_max(SCHED_OTHER); 
     pthread_attr_setinheritsched (&thread_attr, 
     PTHREAD_EXPLICIT_SCHED); 
     pthread_attr_setschedpolicy (&thread_attr, SCHED_OTHER); 
     pthread_attr_setschedparam (&thread_attr, &sched_struct); 
     pthread_attr_setscope (&thread_attr, 
     PTHREAD_SCOPE_SYSTEM); 
     #endif 

    if (pthread_create (&thread_id[0], &thread_attr, 
         producer, NULL) != 0) { 
     printf ("pthread_create failed "); 
     exit (3); 
    } 

    for (i = NUMPRODUCERS; i < NUMCONSUMERS + 1; i++) { 
     if (pthread_create (&thread_id [i], &thread_attr, 
       consumer, (void *)&arg_array [i]) != 0){ 
      printf ("pthread_create failed"); 
      exit (3); 
     } 
    } 

    printf ("just after threads created\n"); 

/*********************************************************************/ 
/* WAIT FOR ALL CONSUMERS TO FINISH, SIGNAL WAITER WILL    */ 
/* NOT FINISH UNLESS A SIGTERM ARRIVES AND WILL THEN EXIT   */ 
/* THE ENTIRE PROCESS....OTHERWISE MAIN THREAD WILL EXIT    */ 
/* THE PROCESS WHEN ALL CONSUMERS ARE FINISHED      */ 
/*********************************************************************/ 

    for (i = 1; i < NUMCONSUMERS + 1; i++) { 
        pthread_join (thread_id [i], NULL); 
    } 

/*****************************************************************/ 
/* GET FINAL TIMESTAMP, CALCULATE ELAPSED SEC AND USEC   */ 
/*****************************************************************/ 


    gettimeofday (&last_time, (struct timezone *) 0); 
      if ((i = last_time.tv_sec - first_time.tv_sec) == 0) 
      j = last_time.tv_usec - first_time.tv_usec; 
      else{ 
      if (last_time.tv_usec - first_time.tv_usec < 0) { 
       i--; 
       j = 1000000 + 
        (last_time.tv_usec - first_time.tv_usec); 
       } else { 
       j = last_time.tv_usec - first_time.tv_usec; } 
        } 
    printf ("Elapsed consumer time is %d sec and %d usec\n", i, j); 

    printf ("\n\n ALL CONSUMERS FINISHED, KILLING PROCESS\n\n"); 
    exit (0); 
} 

/*********************************************/ 
/* INITIAL PART OF PRODUCER.....    */ 
/*********************************************/ 

void *producer (void *arg) 
{ 
    int  i, j, k; 
    int   rand_donuts; 
    unsigned short xsub1 [3]; 
    struct timeval randtime; 
    int threadid = pthread_self (); 
    FILE *prod_file ; 
    char prod_id[10]; 
    char prod_filename[20]="prod_"; 
    gettimeofday (&randtime, (struct timezone *) 0); 
    xsub1 [0] = (unsigned short )randtime.tv_usec; 
    xsub1 [1] = (unsigned short ) (randtime.tv_usec >> 16); 
    xsub1 [2] = (unsigned short ) (pthread_self);  

    while (1) 
    { 
     rand_donuts = nrand48 (xsub1) & 3; 

     pthread_mutex_lock (&prod [rand_donuts]); 

     /** check if there is space available to produce donuts **/ 

    while (space_count [rand_donuts] == 0) 
    { 
       pthread_cond_wait (&prod_cond [rand_donuts], &prod [rand_donuts]); 
     } 


    serial[rand_donuts] ++; 

    fprintf(prod_file,"thread %d is producing donuts of type :%d , serial number of donut: %d \n",threadid,rand_donuts,serial[rand_donuts]); 

    shared_ring.flavor[rand_donuts][shared_ring.in_ptr[rand_donuts]]=serial[rand_donuts]; 

    /** Increment in_ptr modulo NUMSLOTS **/   
    shared_ring.in_ptr[rand_donuts] = (shared_ring.in_ptr[rand_donuts]+1) % NUMSLOTS ; 

    /** Decrement the space count **/ 
    space_count [rand_donuts] --; 

     pthread_mutex_unlock (&prod [rand_donuts]); 

    /* get cons mutex */     
    /* inc donut count */    
    /* unlock cons mutex */   
    /* signal cons_condx_var */ 

    pthread_mutex_lock(&cons [rand_donuts]); 

    donut_count [rand_donuts]++; 

    pthread_mutex_unlock(&cons [rand_donuts]); 

    pthread_cond_signal(&cons_cond[rand_donuts]); 

    usleep(10000); 
    } 
    fclose(prod_file); 
    return NULL; 
} 



/*********************************************/ 
/* Consumer Code ,starts here */ 
/*********************************************/ 



void *consumer (void *arg) 
{ 
    int  i, j, k, m, id ,rand_donuts,outptr,numofdonut; 
    unsigned short xsub [3]; 
    struct timeval randtime; 

    int printresult; 
    int  serial [NUMFLAVORS];     
    int collection [NUMFLAVORS][12];  
    FILE *cons_file; 
    char fileid[10]; 
    char cons_filename[10]="cons_"; 


    time_t current_time; 
    id = *(int *) arg; 

    sprintf(fileid, "%d", id) ;  
    strcat(cons_filename,fileid); 



    gettimeofday (&randtime, (struct timezone *) 0); 
    xsub [0] = (unsigned short)randtime.tv_usec; 
    xsub [1] = (unsigned short) (randtime.tv_usec >> 16); 
    xsub [2] = (unsigned short) (pthread_self); 



    if ((cons_file = fopen(cons_filename, "a+")) == NULL) 
      fprintf(stderr, "Cannot open %s\n", "cons_file"); 

    for(i = 0; i < 10; i++) 
    { 


     for(k = 0; k < NUMFLAVORS; k++) 
     { 
        for(m = 0; m < 12; m++) 
      { 
        collection[k][m] = -1; 
} 

     } 



     for(k = 0; k < NUMFLAVORS; k++) 
     { 
      serial [k] = 0; 
     } 


     for(m = 0; m < 12; m++) 
     { 

      rand_donuts = nrand48(xsub) & 3; 


     /*******************************************************   
     get cons mutex 
     check donut count 
     loop: 
     if donut count == 0 
     wait cons_condx_var 
     *******************************************************/ 

      pthread_mutex_lock (&cons [rand_donuts]); 
      while (donut_count [rand_donuts] == 0) 
      { 
       pthread_cond_wait (&cons_cond [rand_donuts], &cons [rand_donuts]); 
      } 

     /************************************************  
      Consume donut 
     ************************************************/ 




      outptr = shared_ring.outptr[rand_donuts]; 
      numofdonut = shared_ring.flavor[rand_donuts][outptr]; 
      collection[rand_donuts][serial[rand_donuts]]=numofdonut ; 
      serial[rand_donuts] ++; 

      outptr ++; 
      shared_ring.outptr[rand_donuts] = outptr % NUMSLOTS; 

      donut_count [rand_donuts] --; 



      pthread_mutex_unlock (&cons [rand_donuts]); 



      /* lock producer mutex */ 
      pthread_mutex_lock(&prod [rand_donuts]); 

      /*increment the space count*/ 
      space_count [rand_donuts] ++; 

      /* unlock producer mutex */ 
      pthread_mutex_unlock(&prod [rand_donuts]); 

      //signal the producer 
      pthread_cond_signal(&prod_cond[rand_donuts]); 

      } 




       current_time=time(NULL); 
      fprintf(cons_file,"Consumer %d finished collecting dozen#: %d \n",id,i); 
      fprintf(cons_file,"Consumer thread#: %d , time: %s, Dozen#: %d",id,asctime(localtime(&current_time)),i); 
      fprintf(cons_file,"\t plain \t jelly \t coconut \t honey-dip \t"); 
      fprintf(cons_file,"\n"); 

      printresult=0; 
for (i = 1; i < NoOfDozens; i++) 
{ 
      for(m = 0; m < 12 && printresult <=3 ; m++) 
      { 
       printresult = 0; 
       for(k = 0; k < NUMFLAVORS ; k++) 
       {    
        if(collection[k][m] >= 0) 
        { 
         fprintf(cons_file,"\t %d \t",collection[k][m]); 
         printresult = 0; 
        } 
        else 
        { 
         fprintf(cons_file,"\t \t"); 
         printresult =printresult+1; 
        }     

       } 
       fprintf(cons_file,"\n");     
      } 

} 


/*Sleeping*/ 

    fflush(cons_file); 
     usleep(1000); /* sleep 1 ms */ 
     } 


     fprintf(cons_file,"Consumer %d finished collecting all 10 dozen donuts \n",id); 
     fclose(cons_file); 
     return NULL; 
} 

/***********************************************************/ 
/* PTHREAD ASYNCH SIGNAL HANDLER ROUTINE...    */ 
/***********************************************************/ 

void *sig_waiter (void *arg) 
{ 
    sigset_t sigterm_signal; 
    int  signo; 

    sigemptyset (&sigterm_signal); 
    sigaddset (&sigterm_signal, SIGTERM); 
    sigaddset (&sigterm_signal, SIGINT); 

    if (sigwait (&sigterm_signal, & signo) != 0) { 
     printf ("\n sigwait () failed, exiting \n"); 
     exit(2); 
    } 
    printf ("process going down on SIGNAL (number %d)\n\n", signo); 
    exit (1); 
    return NULL; 
} 


/**********************************************************/ 
/* PTHREAD SYNCH SIGNAL HANDLER ROUTINE...    */ 
/**********************************************************/ 

void sig_handler (int sig) 
{ 
    pthread_t signaled_thread_id; 
    int  i, thread_index; 

    signaled_thread_id = pthread_self (); 
    for (i = 0; i < (NUMCONSUMERS + NUMPRODUCERS); i++) { 
     if (signaled_thread_id == thread_id [i]) { 
       thread_index = i; 
       break; 
     } 
    } 
    printf ("\nThread %d took signal # %d, PROCESS HALT\n", 
       thread_index, sig); 
    exit (1); 
} 

是否有人可以告訴我,我要去哪裏錯了,我需要什麼樣的變化,使?提前致謝!!

+0

「不過,這似乎並沒有工作權」 - 這不是一個有效的問題。 –

+0

好的。我似乎陷入了一切僵局。對於我保留的每一個隊列深度,我都會遇到所有循環的死鎖。 – user2830418

+0

C++代碼在哪裏? (你是否真的希望有人能夠穿過460行代碼?) – WhozCraig

回答

0

prod_file未初始化。

// int threadid = pthread_self (); 
pthread_t threadid = pthread_self (); 

// prod_file not initialized 
// FILE *prod_file ; 
FILE *prod_file = stdout; 

// fprintf(prod_file,"thread %d is ... \n",threadid,...); 
fprintf(prod_file,"thread %lX is ... \n",(unsigned long) threadid); 

輸出

thread 80051EE0 is producing donuts of type :1 , serial number of donut: 78 
thread 80051EE0 is producing donuts of type :0 , serial number of donut: 83 
thread 80051EE0 is producing donuts of type :2 , serial number of donut: 82 
thread 80051EE0 is producing donuts of type :1 , serial number of donut: 79 
thread 80051EE0 is producing donuts of type :1 , serial number of donut: 80 
thread 80051EE0 is producing donuts of type :3 , serial number of donut: 96 
thread 80051EE0 is producing donuts of type :0 , serial number of donut: 84 
thread 80051EE0 is producing donuts of type :0 , serial number of donut: 85 
Elapsed consumer time is 5 sec and 335063 usec 


ALL CONSUMERS FINISHED, KILLING PROCESS 

其他

1問題真的應該在發佈前已經減少。

2可以使用更好的格式。

3很多未使用的變量。

// rand_donuts = nrand48 (xsub1) & 3 
rand_donuts = nrand48 (xsub1) % NUMFLAVORS 

// char fileid[10]; 
// char cons_filename[10]="cons_"; 
char fileid[3*sizoef(int) + 3]; // Big enough for any int 
// char cons_filename[5 + sizeof fileid]="cons_"; 
相關問題