1
我試圖做一個分佈式程序做一個簡單的工作(檢查素數),顯然程序正在一個阻塞的等待下降,我不知道爲什麼。它是一個生產者 - 消費者程序並且必須爲每個生產者呼叫或消費者呼叫創建一個線程。任何人都可以幫我嗎? Ps:問題是間歇性的。 我的代碼是波紋管:C++分佈式程序阻塞
#include <pthread.h>
#include <iostream>
#include <atomic>
#include <time.h>
#include <mutex>
#include <semaphore.h>
#include <cmath>
using namespace std;
#define N 2 //buffer size
#define Np 1 //number of producer threads
#define Nc 8 //number of consumer threads
sem_t full, empty; //semaphores declaration
pthread_mutex_t bufferbusy, finish; //mutex declaration
long numbers[N]; //buffer declaration
int finished = 0; //flag declaration
int M = 0; //counter declaration
bool isPrime(long n){
/*checks if long n is prime*/
for (int i = 2; i < sqrt(n) ; i++)
if (n % i == 0)
return false;
return true;
}
void *producer(){
/*waits empty semaphore > 1 then puts a number in the buffer (with exclusive access)*/
while(M<10000){
sem_wait(&empty); //WAIT EMPTY
pthread_mutex_lock(&bufferbusy); //locks bufferbusy mutex to ensure exclusive access
for (int i = 0 ; i < N ; i++){ //scrolls the buffer
if (numbers[i] == 0){ //if finds an empty buffer's position
numbers[i] = rand()%10000001+1; //fit this position with a random number
break; //leave the loop
}
}
pthread_mutex_unlock(&bufferbusy); //unlock bufferbusy mutex
sem_post(&full); //SIGNAL FULL
}
pthread_mutex_lock(&finish); //locks finish mutex to ensure exclusive access
finished++; //count finished threads to measure the execution time
pthread_mutex_unlock(&finish); //unlock finish mutex
pthread_exit(NULL); //finish thread
}
void *consumer(){
/*waits full semaphore > 1 then pick up a number from the buffer (with exclusive access) to check if it's prime*/
long data; //store a buffer number
while(M<10000){
sem_wait(&full); // WAIT FULL
pthread_mutex_lock(&bufferbusy); //locks bufferbusy mutex to ensure exclusive access
for (int i = 0 ; i < N ; i++){ //scrolls the buffer
if (numbers[i] != 0){ //if finds a fited buffer position
data = numbers[i]; //save the number placed at this position
numbers[i] = 0; //clear the buffer position
M++; //increases the consume counter
break; //leave the loop
}
}
pthread_mutex_unlock(&bufferbusy); //unlock bufferbusy mutex
sem_post(&empty); //SIGNAL EMPTY
if(isPrime(data)); //checks if data number is prime
//cout << data << " is prime!" << endl;
//else
//cout << data << " is composite!" << endl;
}
pthread_mutex_lock(&finish); //locks finish mutex to ensure exclusive access
finished++; //count finished threads to measure the execution time
pthread_mutex_unlock(&finish); //unlock finish mutex
pthread_exit(NULL); //finish thread
}
int main (int argc, char *argv[]){
/*___________________________________VARIABLES___________________________________*/
srand (time(NULL)); //seed to measure the execution time
pthread_mutex_init(&bufferbusy,0); //init the bufferbusy mutex with 0
pthread_mutex_init(&finish,0); //init the finish mutex with 0
sem_init(&full, 0, 0); //init the semaphore full(second parameter means that it is only visible by this process)
sem_init(&empty, 0, N); //init the smaphore empty
pthread_t threads[Np+Nc]; //threads declaration
int rc; //handle errors on thread creating
int t; //for loop
/*_______________________________________________________________________________*/
/*_____________________________FILL BUFFER WITH ZEROS____________________________*/
for (int i = 0; i < N; i++) //scrolls the buffer
numbers[i] = 0; //fill all positions with 0
/*_______________________________________________________________________________*/
/*________________________CREATE AND EXECUTE MULTITHREADS________________________*/
clock_t tStart = clock(), tFinish; //start timer
for(t=0; t<Np; t++){ //for each Np
rc = pthread_create(&threads[t], NULL, producer, NULL); //creates a producer thread
if (rc){ //handle error on thread creating
cout << "ERROR; return code from pthread_create() is" << rc << endl;
exit(-1);
}
}
for(t=Np; t<Np+Nc; t++){ //for each Nc
rc = pthread_create(&threads[t], NULL, consumer, NULL); //create a consumer thread
if (rc){ //handle error on thread creating
cout << "ERROR; return code from pthread_create() is" << rc << endl;
exit(-1);
}
}
while (finished < Np+Nc); //blocking wait until all threads have finished
tFinish = clock(); //stop timer
sem_destroy(&full); //semaphore destructor
sem_destroy(&empty); //semaphore destructor
pthread_mutex_destroy(&bufferbusy); //mutex destructor
pthread_mutex_destroy(&finish); //mutex destructor
cout << "Done!" << endl;
cout << "Execution time:" << (double)(tFinish - tStart)/CLOCKS_PER_SEC << endl;
pthread_exit(NULL); //finish last thread
/*_______________________________________________________________________________*/
}