2013-08-23 65 views
2

問題: 我必須增加x1和x2變量,這應該由單獨的線程完成,並且兩個變量的下一個增量不應該被調用,直到兩個變量的前一個增量未完成爲止。在這個解決方案中信號量的使用是正確的嗎?

Problem

建議的解決方案: 初始化4旗語並調用單獨的線程變量的單獨的增量。 2個信號量用於將消息傳遞給啓動遞增的線程,以及2個用於將消息傳遞給主線程的信號量,以實現遞增完成。主線程將等待來自兩個子線程的信號量發佈,顯示兩個變量的增量都已完成,然後主線程將消息傳遞給兩個子線程,以允許進一步遞增。

這一個目前正在對我罰款。但是,一個人能提出更好的解決方案嗎?或者,任何人都可以指出這個解決方案的問題? 任何幫助將不勝感激?提前致謝。

解決方案代碼:

#include <stdio.h> 
#include <pthread.h> 
#include <semaphore.h> 

//Threads 
pthread_t pth1,pth2; 

//Values to calculate 
int x1 = 0, x2 = 0; 

sem_t c1,c2,c3,c4; 

void *threadfunc1(void *parm) 
{ 
    for (;;) { 
     x1++; 
     sem_post(&c1); 
     sem_wait(&c3); 
    } 
    return NULL ; 
} 

void *threadfunc2(void *parm) 
{ 
    for (;;) { 
     x2++; 
     sem_post(&c2); 
     sem_wait(&c4); 
    } 
    return NULL ; 
} 

int main() { 
    sem_init(&c1, 0, 0); 
    sem_init(&c2, 0, 0); 
    sem_init(&c3, 0, 0); 
    sem_init(&c4, 0, 0); 
    pthread_create(&pth1, NULL, threadfunc1, "foo"); 
    pthread_create(&pth2, NULL, threadfunc2, "foo"); 
    sem_wait(&c1); 
    sem_wait(&c2); 
    sem_post(&c3); 
    sem_post(&c4); 
    int loop = 0; 
    while (loop < 8) { 
     // iterated as a step 
     loop++; 
     printf("Initial : x1 = %d, x2 = %d\n", x1, x2); 
     sem_wait(&c1); 
     sem_wait(&c2); 
     printf("Final : x1 = %d, x2 = %d\n", x1, x2); 
     sem_post(&c3); 
     sem_post(&c4); 
    } 
    sem_wait(&c1); 
    sem_wait(&c2); 
    sem_destroy(&c1); 
    sem_destroy(&c2); 
    sem_destroy(&c3); 
    sem_destroy(&c4); 
    printf("Result : x1 = %d, x2 = %d\n", x1, x2); 
    pthread_cancel(pth1); 
    pthread_cancel(pth2); 
    return 1; 
} 
+0

唯一的問題,我可以找出,sem_post和sem_wait是耗時.. –

+0

我假設這是一個sanitized的例子,真正的程序不僅僅是增加計數器?特別的,線程的唯一目的是讓你一次做兩件事。有意將它們鎖定在這種鎖步形式中會破壞它們的整個目的。 –

+0

是的,大圖中,x2可能依賴於x1,需要最後的x1值在特定的步驟求解計算。它是一種求解器,在每次迭代中執行step解決方案@CortAmmon –

回答

1

而不必一堆線程做X1的事情,暫停他們,然後有一堆的線程做X2的事情,考慮一個線程池。一個線程池是一堆線程,它們一直處於空閒狀態,直到你有工作要做,然後解除它的注入並完成工作。

該系統的一個優點是它使用條件變量和互斥而不是信號量。在許多系統上,互斥量比信號量更快(因爲它們更有限)。

// a task is an abstract class describing "something that can be done" which 
// can be put in a work queue 
class Task 
{ 
    public: 
     virtual void run() = 0; 
}; 

// this could be made more Object Oriented if desired... this is just an example. 
// a work queue 
struct WorkQueue 
{ 
    std::vector<Task*> queue; // you must hold the mutex to access the queue 
    bool    finished; // if this is set to true, threadpoolRun starts exiting 
    pthread_mutex_t  mutex; 
    pthread_cond_t  hasWork; // this condition is signaled if there may be more to do 
    pthread_cond_t  doneWithWork; // this condition is signaled if the work queue may be empty 
}; 

void threadpoolRun(void* queuePtr) 
{ 
    // the argument to threadpoolRun is always a WorkQueue* 
    WorkQueue& workQueue= *dynamic_cast<WorkQueue*>(queuePtr); 
    pthread_mutex_lock(&workQueue.mutex); 

    // precondition: every time we start this while loop, we have to have the 
    // mutex. 
    while (!workQueue.finished) { 
     // try to get work. If there is none, we wait until someone signals hasWork 
     if (workQueue.queue.empty()) { 
      // empty. Wait until another thread signals that there may be work 
      // but before we do, signal the main thread that the queue may be empty 
      pthread_cond_broadcast(&workQueue.doneWithWOrk); 
      pthread_cond_wait(&workQueue.hasWork, &workQueue.mutex); 
     } else { 
      // there is work to be done. Grab the task, release the mutex (so that 
      // other threads can get things from the work queue), and start working! 
      Task* myTask = workQueue.queue.back(); 
      workQueue.queue.pop_back(); // no one else should start this task 
      pthread_mutex_unlock(&workQueue.mutex); 

      // now that other threads can look at the queue, take our time 
      // and complete the task. 
      myTask->run(); 

      // re-acquire the mutex, so that we have it at the top of the while 
      // loop (where we need it to check workQueue.finished) 
      pthread_mutex_lock(&workQueue.mutex); 
     } 
    } 
} 

// Now we can define a bunch of tasks to do your particular problem 
class Task_x1a 
: public Task 
{ 
    public: 
     Task_x1a(int* inData) 
     : mData(inData) 
     { } 

     virtual void run() 
     { 
      // do some calculations on mData 
     } 
    private: 
     int* mData; 
}; 

class Task_x1b 
: public Task 
{ ... } 

class Task_x1c 
: public Task 
{ ... } 

class Task_x1d 
: public Task 
{ ... } 

class Task_x2a 
: public Task 
{ ... } 

class Task_x2b 
: public Task 
{ ... } 

class Task_x2c 
: public Task 
{ ... } 

class Task_x2d 
: public Task 
{ ... } 

int main() 
{ 
    // bet you thought you'd never get here! 
    static const int numberOfWorkers = 4; // this tends to be either the number of CPUs 
              // or CPUs * 2 
    WorkQueue workQueue; // create the workQueue shared by all threads 
    pthread_mutex_create(&workQueue.mutex); 
    pthread_cond_create(&workQueue.hasWork); 
    pthread_cond_create(&workQueue.doneWithWork); 
    pthread_t workers[numberOfWorkers]; 
    int data[10]; 

    for (int i = 0; i < numberOfWorkers; i++) 
     pthread_create(&pth1, NULL, &threadpoolRun, &workQueue); 

    // now all of the workers are sitting idle, ready to do work 
    // give them the X1 tasks to do 
    { 
     Task_x1a x1a(data); 
     Task_x1b x1b(data); 
     Task_x1c x1c(data); 
     Task_x1d x1d(data); 

     pthread_mutex_lock(&workQueue.mutex); 
     workQueue.queue.push_back(x1a); 
     workQueue.queue.push_back(x1b); 
     workQueue.queue.push_back(x1c); 
     workQueue.queue.push_back(x1d); 

     // now that we've queued up a bunch of work, we have to signal the 
     // workers that the work is available 
     pthread_cond_broadcast(&workQueue.hasWork); 

     // and now we wait until the workers finish 
     while(!workQueue.queue.empty()) 
      pthread_cond_wait(&workQueue.doneWithWork); 
     pthread_mutex_unlock(&workQueue.mutex); 
    } 
    { 
     Task_x2a x2a(data); 
     Task_x2b x2b(data); 
     Task_x2c x2c(data); 
     Task_x2d x2d(data); 

     pthread_mutex_lock(&workQueue.mutex); 
     workQueue.queue.push_back(x2a); 
     workQueue.queue.push_back(x2b); 
     workQueue.queue.push_back(x2c); 
     workQueue.queue.push_back(x2d); 

     // now that we've queued up a bunch of work, we have to signal the 
     // workers that the work is available 
     pthread_cond_broadcast(&workQueue.hasWork); 

     // and now we wait until the workers finish 
     while(!workQueue.queue.empty()) 
      pthread_cond_wait(&workQueue.doneWithWork); 
     pthread_mutex_unlock(&workQueue.mutex); 
    } 

    // at the end of all of the work, we want to signal the workers that they should 
    // stop. We do so by setting workQueue.finish to true, then signalling them 
    pthread_mutex_lock(&workQueue.mutex); 
    workQueue.finished = true; 
    pthread_cond_broadcast(&workQueue.hasWork); 
    pthread_mutex_unlock(&workQueue.mutex); 

    pthread_mutex_destroy(&workQueue.mutex); 
    pthread_cond_destroy(&workQueue.hasWork); 
    pthread_cond_destroy(&workQueue.doneWithWork); 
    return data[0]; 
} 

主要注意事項:

  • 如果你有比CPU更多的任務,使額外的線程是CPU只是更多的簿記。線程池接受任意數量的任務,然後使用最高效的CPU數量對它們進行處理。
  • 如果有更多的工作比CPU(如4個CPU和1000個任務),那麼這個系統是非常非常有效的。互斥鎖/解鎖是最便宜的線程同步,你會得到一個無鎖隊列(這可能比它更值得工作)。如果你有一堆任務,它會一次只抓住一個任務。
  • 如果你的任務非常小(就像你上面的增量例子),你可以很容易地修改threadPool來一次抓取多個任務,然後串行工作。
0

與你的程序的問題是,您要同步線程相互步調一致運行。在每個線程中,每次迭代時,計數器遞增,然後調用兩個同步基元。因此,循環體中超過一半的時間花在了同步上。

在你的程序中,專櫃真的什麼都沒有做對方,所以他們真的應該相互獨立,這意味着每個線程中的反覆實際上可以做實際的計算,而不是主要的同步運行。

對於輸出的要求,可以允許每個線程把每個子計算到一個數組,主線程可以從存儲器讀取。主線程等待每個線程完成完成,然後可以從每個數組讀取以創建輸出。

void *threadfunc1(void *parm) 
{ 
    int *output = static_cast<int *>(parm); 
    for (int i = 0; i < 10; ++i) { 
     x1++; 
     output[i] = x1; 
    } 
    return NULL ; 
} 

void *threadfunc2(void *parm) 
{ 
    int *output = static_cast<int *>(parm); 
    for (int i = 0; i < 10; ++i) { 
     x2++; 
     output[i] = x2; 
    } 
    return NULL ; 
} 

int main() { 
    int out1[10]; 
    int out2[10]; 
    pthread_create(&pth1, NULL, threadfunc1, out1); 
    pthread_create(&pth2, NULL, threadfunc2, out2); 
    pthread_join(pth1, NULL); 
    pthread_join(pth2, NULL); 
    int loop = 0; 
    while (loop < 9) { 
     // iterated as a step 
     loop++; 
     printf("Final : x1 = %d, x2 = %d\n", out1[loop], out2[loop]); 
    } 
    printf("Result : x1 = %d, x2 = %d\n", out1[9], out2[9]); 
    return 1; 
} 
+0

循環到10? 8k,或者其他任何一種L1高速緩存大小都會更好。 –

+0

@MartinJames:我正在關注問題中顯示的輸出。提問者自己的代碼僅循環到8. – jxh

+0

謝謝@jxh。但正如我通過查看你的代碼所理解的那樣,你可以獨立計算並保存它。但是在更大的情況下,x2可能依賴於x1,這需要在特定的步驟求解計算最新的x1值。 –

相關問題