2009-01-10 75 views
2

使用POSIX線程& C++,我有一個「插入操作」,它只能一次安全地完成。pthread_join - 多線程等待

如果我有多個線程在等待使用pthread_join插入,然後在完成時產生一個新線程 。它們是否都會一次收到「線程完成」信號併產生多個插入,或者可以安全地假設首先收到「線程完成」信號的線程會產生一個阻塞其他線程的新線程來創建新線程。

/* --- GLOBAL --- */ 
pthread_t insertThread; 



/* --- DIFFERENT THREADS --- */ 
// Wait for Current insert to finish 
pthread_join(insertThread, NULL); 

// Done start a new one 
pthread_create(&insertThread, NULL, Insert, Data); 

感謝您的答覆

程序基本上是一個巨大的哈希表這需要通過套接字來自客戶端的請求。

每個新的客戶端連接都會產生一個新線程,然後可以執行多個操作,特別是查找或插入。查詢可以並行進行。但插入需要「重新組合」成一個單獨的線程。你可以說查找操作可以在不產生客戶端的新線程的情況下完成,但是它們可能需要一段時間纔會導致服務器鎖定,從而丟失新的請求。該設計試圖儘可能減少系統調用和線程創建。

但現在,我知道這是不是安全的我第一次覺得我應該能湊齊東西一起

感謝

回答

3

opengroup.org on pthread_join

的多個同時呼叫的結果在pthread_join()指定相同的目標線程是不確定的。

所以,你真的不應該有幾個線程加入你以前的insertThread。

首先,當您使用C++時,我推薦使用boost.thread。它們類似於POSIX線程模型,也可以在Windows上工作。它可以幫助您使用C++,即通過使功能對象更容易使用。

其次,爲什麼你要開始一個插入元素的新線程,當你在開始下一個元素之前必須等待前一個元素完成?似乎不是經典的多線程使用。

雖然...一個典型的解決方案是讓一個工作線程從事件隊列中獲取作業,並將其他線程發佈到事件隊列中。

如果你真的只是想繼續或多或少你現在的樣子,你不得不這樣做:

  • 創建一個條件變量,像insert_finished
  • 想要做插入的所有線程,等待條件變量。
  • 只要一個線程完成插入操作,它就會觸發條件變量。
  • 由於條件變量需要一個互斥鎖,你可以通知所有的等待線程,它們都希望開始插入,但是因爲一次只有一個線程可以獲取互斥鎖,所有線程都會按順序執行插入操作。

但是您應該注意,您的同步不是以太特殊的方式實現的。由於這被稱爲insert,我懷疑你想操作一個數據結構,所以你可能想要首先實現一個線程安全的數據結構,而不是共享數據結構訪問和所有客戶端之間的同步。我也懷疑會有更多的操作,然後只是insert,這將需要適當的同步...

2

根據單一Unix Specifcation的道:「多個同時執行的結果,在pthread_join ()指定相同的目標線程未定義。「實現單線程來獲取任務的「正常方式」是設置一個條件變量(不要忘記相關的互斥量):空閒線程在pthread_cond_wait()(或pthread_cond_timedwait())中等待,當完成該工作的線程完成時,它會使用pthread_cond_signal()喚醒其中一個空閒線程。

0

其他人已經指出這有未定義的行爲。我只是補充說,完成任務的最簡單方法(只允許一個線程執行部分代碼)是使用一個簡單的互斥體 - 您需要執行該代碼的線程是MUTally EXclusive,這就是互斥體到達的地方它的名字:-)

如果你需要的代碼到一個特定的線程(如Java AWT)來運行,那麼你需要條件變量。但是,您應該認真考慮這個解決方案是否真正有效。想象一下,如果您每秒調用一次「插入操作」10000次,您需要多少上下文切換。

0

正如你剛纔提到你正在使用的哈希表與平行於插入幾個看起坐,我建議你檢查你是否可以使用並行哈希表。

由於精確的查找結果是不確定的,當你同時插入的元素,例如併發哈希地圖可能正是你需要的。但是,我並沒有在C++中使用併發散列表,但是因爲它們都可以在Java中使用,所以您肯定會找到一個在C++中執行此操作的庫。

0

,我發現它支持插入不鎖定新查找唯一庫 - Sunrise DD(我不知道它是否支持併發插入)

但是從谷歌的Sparse Hash map超過開關一倍的內存使用情況。查找應該很少發生,所以而不是試圖寫我自己的庫 ,它結合了兩者的優點,我寧願只是在安全更改時鎖定表暫停查找。

再次感謝

0

在我看來,你想序列化插入到散列表。

爲此,您需要一個鎖定 - 不產生新的線程。

0

從您的描述,看起來非常低效的,因爲你正在重新創建要插入的東西,每次插入線程。創建線程的成本不是0。

這個問題的一個更常見的解決方案是產生一個等待隊列的插入線程(即,在循環爲空時位於循環休眠中)。其他線程然後將工作項添加到隊列中。插入線程按照它們被添加的順序(或者如果你想要的話,按優先級)選擇隊列中的項目並執行相應的操作。

你所要做的就是確保加入隊列是受到保護的,這樣一次只有一個線程可以訪問修改實際隊列,並且插入線程不會忙於等待,在隊列中(請參閱條件變量)。

1

是的,因爲大多數人推薦最好的方式似乎有一個工作線程從隊列中讀取。一些代碼片段低於

pthread_t  insertThread = NULL; 
    pthread_mutex_t insertConditionNewMutex = PTHREAD_MUTEX_INITIALIZER; 
    pthread_mutex_t insertConditionDoneMutex = PTHREAD_MUTEX_INITIALIZER; 
    pthread_cond_t insertConditionNew  = PTHREAD_COND_INITIALIZER; 
    pthread_cond_t insertConditionDone  = PTHREAD_COND_INITIALIZER; 

     //Thread for new incoming connection 
     void * newBatchInsert() 
     { 
      for(each Word) 
      { 
          //Push It into the queue 
          pthread_mutex_lock(&lexicon[newPendingWord->length - 1]->insertQueueMutex); 
           lexicon[newPendingWord->length - 1]->insertQueue.push(newPendingWord); 
          pthread_mutex_unlock(&lexicon[newPendingWord->length - 1]->insertQueueMutex); 

      } 

        //Send signal to worker Thread 
        pthread_mutex_lock(&insertConditionNewMutex); 
         pthread_cond_signal(&insertConditionNew); 
        pthread_mutex_unlock(&insertConditionNewMutex); 

        //Wait Until it's finished 
        pthread_cond_wait(&insertConditionDone, &insertConditionDoneMutex); 

     } 


      //Worker thread 
      void * insertWorker(void *) 
      { 

       while(1)   
       { 

        pthread_cond_wait(&insertConditionNew, &insertConditionNewMutex); 

        for (int ii = 0; ii < maxWordLength; ++ii) 
        {     
          while (!lexicon[ii]->insertQueue.empty()) 
          { 

           queueNode * newPendingWord = lexicon[ii]->insertQueue.front(); 


           lexicon[ii]->insert(newPendingWord->word); 

           pthread_mutex_lock(&lexicon[ii]->insertQueueMutex); 
           lexicon[ii]->insertQueue.pop(); 
           pthread_mutex_unlock(&lexicon[ii]->insertQueueMutex); 

          } 

        } 

        //Send signal that it's done 
        pthread_mutex_lock(&insertConditionDoneMutex); 
         pthread_cond_broadcast(&insertConditionDone); 
        pthread_mutex_unlock(&insertConditionDoneMutex); 

       } 

      } 

      int main(int argc, char * const argv[]) 
      { 

       pthread_create(&insertThread, NULL, &insertWorker, NULL); 


       lexiconServer = new server(serverPort, (void *) newBatchInsert); 

       return 0; 
      } 
0

理想情況下,即使它們執行不同的操作,也不希望多個線程池在單個進程中。線程的可恢復性是一個重要的體系結構定義,如果你使用C,這將導致在主線程中創建pthread_join。當然,對於C++線程池又稱爲ThreadFactory,其思想是保持線程原語抽象,它可以處理傳遞給它的任何函數/操作類型。

一個典型的例子是一個網絡服務器,它具有連接池和線程池,服務連接並進一步處理它們,但都是從一個公共的線程池進程派生的。

摘要:避免PTHREAD_JOIN在主線程以外的任何地方。