2016-02-17 18 views
1

最近,我開始研究pthreads並試圖用pthreads實現軟件流水線操作。要做到這一點,我自己寫了一個玩具程序,其中一個類似的程序將成爲我主要項目的一部分。使用pthreads進行簡單流水線操作

因此,在這個程序中的主線程創建和輸入輸出緩衝器整數類型,然後創建單個主線程並傳遞給主線程這些緩衝區。 主線程依次創建兩個工作線程

輸入和從所述主傳遞給主線程輸出緩衝器是大小Ñ X ķ(例如尺寸爲int 5×10)的。 主線程針對n(即5)次重複遍歷大小爲k(即10)的塊。 在主線程k(這裏是5)的循環運行次數。在ķ每次迭代主線程確實大小Ñ的輸入數據的一部分的一些操作,並將其放置在主工作線程之間共享的共同緩衝主線程然後通知工作線程數據已被放置在公共緩衝區中。

兩個工作線程等待來自主線程信號如果公共緩衝器已準備就緒。 公用緩衝區上的操作分爲工作線程中的一半。這意味着一個工作線程將在上半年工作,而另一個工作線程將工作在的下一半通用緩衝區。 一旦工作線程主線程得到信號,每個工作線程的做他們的一半的數據的一些操作,並將其複製到輸出緩衝。然後工作線程通過設置標誌值來通知主線程它們的操作在公用緩衝器上完成。爲工作線程創建了一組標誌。該主線程不斷,如果所有的標誌都設置這基本上意味着所有工作線程說完就公共緩衝區他們的操作等主線程可以將下一數據塊到公共緩衝區檢查安全地爲工作線程的消耗。

所以基本上有以流水線的方式在工作線程之間的通信。最後,我在主線程中輸出輸出緩衝區。但是我根本沒有輸出。我有複製粘貼我的代碼幾乎所有步驟的完整評論。

#include <stdio.h> 
#include <stdlib.h> 
#include <pthread.h> 
#include <sys/types.h> 
#include <sys/time.h> 
#include <semaphore.h> 
#include <unistd.h> 
#include <stdbool.h> 
#include <string.h> 

#define MthNum 1 //Number of Master threads 
#define WthNum 2 //Number of Worker threads 
#define times 5 // Number of times the iteration (n in the explanation) 
#define elNum 10 //Chunk size during each iteration (k in the explanation) 

pthread_mutex_t mutex; // mutex variable declaration 
pthread_cond_t cond_var; //conditional variarble declaration 
bool completion_flag = true; //This global flag indicates the completion of the worker thread. Turned false once all operation ends 
          //marking the completion 
int *commonBuff; //common buffer between master and worker threads 
int *commFlags; //array of flags that are turned to 1 by each worker threads. So worker thread i turns commFlags[i] to 1 
       // the master thread turns commFlags[i] = 0 for i =0 to (WthNum - 1) 
int *commFlags_s; 
int counter; // This counter used my master thread to count if all the commFlags[i] that shows 
      //all the threads finished their work on the common buffer 
// static pthread_barrier_t barrier; 
// Arguments structure passed to master thread 
typedef struct{ 
    int *input; // input buffer 
    int *output;// output buffer 
}master_args; 

// Arguments structure passed to worker thread 
typedef struct{ 
    int threadId; 
    int *outBuff; 
}worker_args; 

void* worker_func(void *arguments); 
void *master_func(void *); 

int main(int argc,char*argv[]){ 

    int *ipData,*opData; 
    int i,j; 

    // allocation of input buffer and initializing to 0 
    ipData = (int *)malloc(times*elNum*sizeof(int)); 
    memset(ipData,0,times*elNum*sizeof(int)); 

    // allocation of output buffer and initializing to 0 
    opData = (int *)malloc(times*elNum*sizeof(int)); 
    memset(opData,0,times*elNum*sizeof(int)); 

    pthread_t thread[MthNum]; 
    master_args* args[MthNum]; 


    //creating the single master thread and passing the arguments 
    for(i=0;i<MthNum;i++){ 
     args[i] = (master_args *)malloc(sizeof(master_args)); 
     args[i]->input= ipData; 
     args[i]->output= opData; 
     pthread_create(&thread[i],NULL,master_func,(void *)args[i]); 
    } 

    //joining the master thred 
    for(i=0;i<MthNum;i++){ 
     pthread_join(thread[i],NULL); 
    } 

    //printing the output buffer values 
    for(j =0;j<times;j++){ 
     for(i =0;i<elNum;i++){ 
      printf("%d\t",opData[i+j*times]); 
     } 
     printf("\n"); 
    } 

    return 0; 
} 

//This is the master thread function 
void *master_func(void *arguments){ 

    //copying the arguments pointer to local variables 
    master_args* localMasterArgs = (master_args *)arguments; 
    int *indataArgs = localMasterArgs->input; //input buffer 
    int *outdataArgs = localMasterArgs->output; //output buffer 

    //worker thread declaration 
    pthread_t Workers[WthNum]; 
    //worker thread arguments declaration 
    worker_args* wArguments[WthNum]; 
    int i,j; 

    pthread_mutex_init(&mutex, NULL); 
    pthread_cond_init (&cond_var, NULL); 
    counter =0; 

    commonBuff = (int *)malloc(elNum*sizeof(int)); 

    commFlags = (int *)malloc(WthNum*sizeof(int)); 
    memset(commFlags,0,WthNum*sizeof(int)); 
    commFlags_s= (int *)malloc(WthNum*sizeof(int)); 
    memset(commFlags_s,0,WthNum*sizeof(int)); 

    for(i =0;i<WthNum;i++){ 

     wArguments[i] = (worker_args*)malloc(sizeof(worker_args)); 
     wArguments[i]->threadId = i; 
     wArguments[i]->outBuff = outdataArgs; 

     pthread_create(&Workers[i],NULL,worker_func,(void *)wArguments[i]); 
    } 

    for (i = 0; i < times; i++) { 
     for (j = 0; j < elNum; j++) 
      indataArgs[i + j * elNum] = i + j; 

     while (counter != 0) { 
      counter = 0; 

      pthread_mutex_lock(&mutex); 
      for (j = 0; j < WthNum; j++) { 
       counter += commFlags_s[j]; 
      } 
      pthread_mutex_unlock(&mutex); 

     } 
     pthread_mutex_lock(&mutex); 
     memcpy(commonBuff, &indataArgs[i * elNum], sizeof(int)); 
     pthread_mutex_unlock(&mutex); 
     counter = 1; 
     while (counter != 0) { 
      counter = 0; 

      pthread_mutex_lock(&mutex); 
      for (j = 0; j < WthNum; j++) { 
       counter += commFlags[j]; 
      } 
      pthread_mutex_unlock(&mutex); 


     } 
     // printf("master broad cast\n"); 
     pthread_mutex_lock(&mutex); 
     pthread_cond_broadcast(&cond_var); 
     //releasing the lock 
     pthread_mutex_unlock(&mutex); 

    } 

    pthread_mutex_lock(&mutex); 
    completion_flag = false; 
    pthread_mutex_unlock(&mutex); 

    for (i = 0; i < WthNum; i++) { 
     pthread_join(Workers[i], NULL); 
    } 

    pthread_mutex_destroy(&mutex); 
    pthread_cond_destroy(&cond_var); 

    return NULL; 
} 


void* worker_func(void *arguments){ 

    worker_args* localArgs = (worker_args*)arguments; 

    //copying the thread ID and the output buffer 
    int tid = localArgs->threadId; 
    int *localopBuffer = localArgs->outBuff; 
    int i,j; 
    bool local_completion_flag=false; 

    while(local_completion_flag){ 

     pthread_mutex_lock(&mutex); 
     commFlags[tid] =0; 
     commFlags_s[tid] =1; 
     pthread_cond_wait(&cond_var,&mutex); 
     commFlags_s[tid] =0; 
     commFlags[tid] =1; 
     if (tid == 0) { 
      for (i = 0; i < (elNum/2); i++) { 
       localopBuffer[i] = commonBuff[i] * 5; 
      } 
     } else { // Thread ID 1 operating on the other half of the common buffer data and placing on the 
       // output buffer 
      for (i = 0; i < (elNum/2); i++) { 
       localopBuffer[elNum/2 + i] = commonBuff[elNum/2 + i] * 10; 
      } 
     } 
     local_completion_flag=completion_flag; 
     pthread_mutex_unlock(&mutex);//releasing the lock 

    } 

    return NULL; 
} 

但我不知道在哪裏我在我的實現,因爲邏輯上似乎是正確的做了錯事。但是我的執行過程肯定有問題。我花了很長時間嘗試不同的事情來解決它,但沒有任何工作。對不起,這篇文章很長,但是我無法確定哪一部分我可能做錯了,所以我無法簡化這篇文章。因此,如果任何人都可以看看問題和實施情況,並且可以建議需要做什麼修改才能按預期運行,那麼這將會非常有幫助。感謝您的幫助和幫助。

+0

是'counter'應該由互斥體進行保護或不?似乎有很多情況下,如果您不通過廣播c.v來修改'counter'而不保留mutex或將'counter'設置爲零。 - 這些事情可能導致等待已經發生的事情。 –

+0

此外,我建議等待6的聲譽量子時間線程睡30英寸的使用例如18 – dewelloper

+0

@DavidSchwartz抱歉,我遲到的答覆。計數器將計算工作線程將打開的標誌數量。主線程測量工作線程變爲1的標誌並增加計數器值。所以如果計數器值等於線程數,那麼while循環會退出並繼續前進,所以計數器不會被工作線程修改。該計數器僅由主線程修改。它將值傳遞給工作線程,因爲等待計數器值的工作線程爲0.我不確定我是否需要互斥量 – duttasankha

回答

0

這段代碼有幾處錯誤。

  1. 你可以從固定的工作線程的創建開始:

    wArguments[i] = (worker_args*)malloc(sizeof(worker_args)); 
    wArguments[i]->threadId = i; 
    wArguments[i]->outBuff = outdataArgs; 
    
    pthread_create(&Workers[i],NULL,worker_func, (void *)wArguments); 
    

要初始化worker_args結構,但不正確的 - 傳遞指針數組(void *)wArguments,而不是指向你只是初始化數組元素。

void *master_func(void *arguments) 
{ 
/* (...) */ 
pthread_mutex_init(&mutex, NULL); 
pthread_cond_init (&cond_var, NULL); 
counter = WthNum; 
  • 當開始主線程,則錯誤地傳遞指針指針:

    pthread_create(&Workers[i],NULL,worker_func, (void *)wArguments[i]); 
    //                ^^^ 
    
    開始使用它的值線程之前
  • 初始化計數器

    pthread_create(&thread[i],NULL,master_func,(void *)&args[i]); 
    
  • 請更改爲:

    pthread_create(&thread[i],NULL,master_func,(void *) args[i]); 
    
  • 所有訪問counter變量(如任何其他共享存儲器)必須在線程之間同步。
  • +0

    你好 謝謝你的回覆。我包括了這些變化,但該計劃仍然沒有按預期運行。我認爲該計劃正在無限循環中等待,而且還沒有終止。我是否需要使用互斥鎖初始化計數器 – duttasankha

    +0

    @duttasankha是的,當然 - 訪問共享變量必須在所有執行路徑之間同步 – 4pie0

    +0

    我將計數器置於互斥鎖中。但它仍然不起作用。 – duttasankha