2017-03-31 56 views
1

我試圖做一個分佈式程序做一個簡單的工作(檢查素數),顯然程序正在一個阻塞的等待下降,我不知道爲什麼。它是一個生產者 - 消費者程序並且必須爲每個生產者呼叫或消費者呼叫創建一個線程。任何人都可以幫我嗎? Ps:問題是間歇性的。 我的代碼是波紋管:C++分佈式程序阻塞

#include <pthread.h> 
#include <iostream> 
#include <atomic> 
#include <time.h> 
#include <mutex> 
#include <semaphore.h> 
#include <cmath> 

using namespace std; 

#define N 2 //buffer size 
#define Np 1 //number of producer threads 
#define Nc 8 //number of consumer threads 

sem_t full, empty; //semaphores declaration 
pthread_mutex_t bufferbusy, finish; //mutex declaration 
long numbers[N]; //buffer declaration 
int finished = 0; //flag declaration 
int M = 0; //counter declaration 

bool isPrime(long n){ 
    /*checks if long n is prime*/ 
    for (int i = 2; i < sqrt(n) ; i++) 
     if (n % i == 0) 
      return false; 
    return true; 
} 

void *producer(){ 
    /*waits empty semaphore > 1 then puts a number in the buffer (with exclusive access)*/ 
    while(M<10000){ 
     sem_wait(&empty); //WAIT EMPTY 
     pthread_mutex_lock(&bufferbusy); //locks bufferbusy mutex to ensure exclusive access 

     for (int i = 0 ; i < N ; i++){ //scrolls the buffer 
      if (numbers[i] == 0){ //if finds an empty buffer's position 
       numbers[i] = rand()%10000001+1; //fit this position with a random number 
       break; //leave the loop 
      }  
     } 

     pthread_mutex_unlock(&bufferbusy); //unlock bufferbusy mutex 
     sem_post(&full); //SIGNAL FULL 
    } 

    pthread_mutex_lock(&finish); //locks finish mutex to ensure exclusive access 
    finished++; //count finished threads to measure the execution time 
    pthread_mutex_unlock(&finish); //unlock finish mutex 

    pthread_exit(NULL); //finish thread 
} 

void *consumer(){ 
    /*waits full semaphore > 1 then pick up a number from the buffer (with exclusive access) to check if it's prime*/ 
    long data; //store a buffer number 

    while(M<10000){ 
     sem_wait(&full); // WAIT FULL 
     pthread_mutex_lock(&bufferbusy); //locks bufferbusy mutex to ensure exclusive access 

     for (int i = 0 ; i < N ; i++){ //scrolls the buffer 
      if (numbers[i] != 0){ //if finds a fited buffer position 
       data = numbers[i]; //save the number placed at this position 
       numbers[i] = 0; //clear the buffer position 
       M++; //increases the consume counter 
       break; //leave the loop 
      } 
     } 

     pthread_mutex_unlock(&bufferbusy); //unlock bufferbusy mutex 
     sem_post(&empty); //SIGNAL EMPTY 

     if(isPrime(data)); //checks if data number is prime 
      //cout << data << " is prime!" << endl; 
     //else 
      //cout << data << " is composite!" << endl; 
    } 

    pthread_mutex_lock(&finish); //locks finish mutex to ensure exclusive access 
    finished++; //count finished threads to measure the execution time 
    pthread_mutex_unlock(&finish); //unlock finish mutex 

    pthread_exit(NULL); //finish thread 
} 

int main (int argc, char *argv[]){ 
    /*___________________________________VARIABLES___________________________________*/ 
    srand (time(NULL)); //seed to measure the execution time 
    pthread_mutex_init(&bufferbusy,0); //init the bufferbusy mutex with 0 
    pthread_mutex_init(&finish,0); //init the finish mutex with 0 
    sem_init(&full, 0, 0); //init the semaphore full(second parameter means that it is only visible by this process) 
    sem_init(&empty, 0, N); //init the smaphore empty 
    pthread_t threads[Np+Nc]; //threads declaration 
    int rc; //handle errors on thread creating 
    int t; //for loop 
    /*_______________________________________________________________________________*/ 

    /*_____________________________FILL BUFFER WITH ZEROS____________________________*/ 
    for (int i = 0; i < N; i++) //scrolls the buffer 
     numbers[i] = 0; //fill all positions with 0 
    /*_______________________________________________________________________________*/ 

    /*________________________CREATE AND EXECUTE MULTITHREADS________________________*/ 
    clock_t tStart = clock(), tFinish; //start timer 


    for(t=0; t<Np; t++){ //for each Np 
     rc = pthread_create(&threads[t], NULL, producer, NULL); //creates a producer thread 
     if (rc){ //handle error on thread creating 
      cout << "ERROR; return code from pthread_create() is" << rc << endl; 
      exit(-1); 
     } 
    } 

    for(t=Np; t<Np+Nc; t++){ //for each Nc 
     rc = pthread_create(&threads[t], NULL, consumer, NULL); //create a consumer thread 
     if (rc){ //handle error on thread creating 
      cout << "ERROR; return code from pthread_create() is" << rc << endl; 
      exit(-1); 
     } 
    } 

    while (finished < Np+Nc); //blocking wait until all threads have finished 


    tFinish = clock(); //stop timer 

    sem_destroy(&full); //semaphore destructor 
    sem_destroy(&empty); //semaphore destructor 
    pthread_mutex_destroy(&bufferbusy); //mutex destructor 
    pthread_mutex_destroy(&finish); //mutex destructor 

    cout << "Done!" << endl; 
    cout << "Execution time:" << (double)(tFinish - tStart)/CLOCKS_PER_SEC << endl; 
    pthread_exit(NULL); //finish last thread 
    /*_______________________________________________________________________________*/ 
} 

回答

0

它已經工作。在退出每個線程之前只需添加一個sem_post()以允許其他線程退出足以解決問題。