2011-02-09 41 views
5

我是新來的Windows多線程,所以這可能是一個微不足道的問題:確保線程以鎖步方式執行循環的最簡單方法是什麼?如何使Win32/MFC線程以鎖步方式循環?

我試着將一個共享數組Event傳遞給所有線程,並在循環結束時使用WaitForMultipleObjects來同步它們,但是這會在一個循環,有時是兩個循環之後給我一個死鎖。這裏是我當前的代碼的簡化版本(只有兩個線程,但我想,使其可擴展):

typedef struct 
{ 
    int rank; 
    HANDLE* step_events; 
} IterationParams; 

int main(int argc, char **argv) 
{ 
    // ... 

    IterationParams p[2]; 
    HANDLE step_events[2]; 
    for (int j=0; j<2; ++j) 
    { 
     step_events[j] = CreateEvent(NULL, FALSE, FALSE, NULL); 
    } 

    for (int j=0; j<2; ++j) 
    { 
     p[j].rank = j; 
     p[j].step_events = step_events; 
     AfxBeginThread(Iteration, p+j); 
    } 

    // ... 
} 

UINT Iteration(LPVOID pParam) 
{ 
    IterationParams* p = (IterationParams*)pParam; 
    int rank = p->rank; 

    for (int i=0; i<100; i++) 
    { 
     if (rank == 0) 
     { 
      printf("%dth iteration\n",i); 
      // do something 
      SetEvent(p->step_events[0]); 
      WaitForMultipleObjects(2, p->step_events, TRUE, INFINITE); 
     } 
     else if (rank == 1) 
     { 
      // do something else 
      SetEvent(p->step_events[1]); 
      WaitForMultipleObjects(2, p->step_events, TRUE, INFINITE); 
     } 
    } 
    return 0; 
} 

(我知道我混合使用C和C++,它實際上是傳統的C代碼,我試圖並行化。)

閱讀MSDN上的文檔,我認爲這應該起作用。但是,線程0只打印一次,偶爾兩次,然後程序掛起。這是同步線程的正確方法嗎?如果不是的話,你會推薦什麼(在MFC中是否真的沒有對障礙的內置支持?)。


EDIT:該解決方案是WRONG,甚至包括Alessandro's fix。例如,請考慮以下情況:

  1. 線程0套它的事件和呼叫等待,塊
  2. 線程1設置它的事件,並調用wait,塊
  3. 線程等待0返回,將它的事件,完成一個沒有線程1獲取控制的循環
  4. 線程0設置自己的事件並調用Wait。由於線程1沒有機會重置其事件,線程0的Wait立即返回並且線程不同步。

所以問題仍然存在:一個安全如何確保線程保持鎖步?

回答

4

介紹

我實現供你考慮一個簡單的C++程序(在Visual Studio 2010測試)。它只使用Win32 APIs(以及用於控制檯輸出和一點隨機化的標準庫)。你應該可以把它放到一個新的Win32控制檯項目中(沒有預編譯頭文件),編譯並運行。


解決方案

#include <tchar.h> 
#include <windows.h> 


//--------------------------------------------------------- 
// Defines synchronization info structure. All threads will 
// use the same instance of this struct to implement randezvous/ 
// barrier synchronization pattern. 
struct SyncInfo 
{ 
    SyncInfo(int threadsCount) : Awaiting(threadsCount), ThreadsCount(threadsCount), Semaphore(::CreateSemaphore(0, 0, 1024, 0)) {}; 
    ~SyncInfo() { ::CloseHandle(this->Semaphore); } 
    volatile unsigned int Awaiting; // how many threads still have to complete their iteration 
    const int ThreadsCount; 
    const HANDLE Semaphore; 
}; 


//--------------------------------------------------------- 
// Thread-specific parameters. Note that Sync is a reference 
// (i.e. all threads share the same SyncInfo instance). 
struct ThreadParams 
{ 
    ThreadParams(SyncInfo &sync, int ordinal, int delay) : Sync(sync), Ordinal(ordinal), Delay(delay) {}; 
    SyncInfo &Sync; 
    const int Ordinal; 
    const int Delay; 
}; 


//--------------------------------------------------------- 
// Called at the end of each itaration, it will "randezvous" 
// (meet) all the threads before returning (so that next 
// iteration can begin). In practical terms this function 
// will block until all the other threads finish their iteration. 
static void RandezvousOthers(SyncInfo &sync, int ordinal) 
{ 
    if (0 == ::InterlockedDecrement(&(sync.Awaiting))) { // are we the last ones to arrive? 
     // at this point, all the other threads are blocking on the semaphore 
     // so we can manipulate shared structures without having to worry 
     // about conflicts 
     sync.Awaiting = sync.ThreadsCount; 
     wprintf(L"Thread %d is the last to arrive, releasing synchronization barrier\n", ordinal); 
     wprintf(L"---~~~---\n"); 

     // let's release the other threads from their slumber 
     // by using the semaphore 
     ::ReleaseSemaphore(sync.Semaphore, sync.ThreadsCount - 1, 0); // "ThreadsCount - 1" because this last thread will not block on semaphore 
    } 
    else { // nope, there are other threads still working on the iteration so let's wait 
     wprintf(L"Thread %d is waiting on synchronization barrier\n", ordinal); 
     ::WaitForSingleObject(sync.Semaphore, INFINITE); // note that return value should be validated at this point ;) 
    } 
} 


//--------------------------------------------------------- 
// Define worker thread lifetime. It starts with retrieving 
// thread-specific parameters, then loops through 5 iterations 
// (randezvous-ing with other threads at the end of each), 
// and then finishes (the thread can then be joined). 
static DWORD WINAPI ThreadProc(void *p) 
{ 
    ThreadParams *params = static_cast<ThreadParams *>(p); 
    wprintf(L"Starting thread %d\n", params->Ordinal); 

    for (int i = 1; i <= 5; ++i) { 
     wprintf(L"Thread %d is executing iteration #%d (%d delay)\n", params->Ordinal, i, params->Delay); 
     ::Sleep(params->Delay); 
     wprintf(L"Thread %d is synchronizing end of iteration #%d\n", params->Ordinal, i); 
     RandezvousOthers(params->Sync, params->Ordinal); 
    } 

    wprintf(L"Finishing thread %d\n", params->Ordinal); 
    return 0; 
} 


//--------------------------------------------------------- 
// Program to illustrate iteration-lockstep C++ solution. 
int _tmain(int argc, _TCHAR* argv[]) 
{ 
    // prepare to run 
    ::srand(::GetTickCount()); // pseudo-randomize random values :-) 
    SyncInfo sync(4); 
    ThreadParams p[] = { 
     ThreadParams(sync, 1, ::rand() * 900/RAND_MAX + 100), // a delay between 200 and 1000 milliseconds will simulate work that an iteration would do 
     ThreadParams(sync, 2, ::rand() * 900/RAND_MAX + 100), 
     ThreadParams(sync, 3, ::rand() * 900/RAND_MAX + 100), 
     ThreadParams(sync, 4, ::rand() * 900/RAND_MAX + 100), 
    }; 

    // let the threads rip 
    HANDLE t[] = { 
     ::CreateThread(0, 0, ThreadProc, p + 0, 0, 0), 
     ::CreateThread(0, 0, ThreadProc, p + 1, 0, 0), 
     ::CreateThread(0, 0, ThreadProc, p + 2, 0, 0), 
     ::CreateThread(0, 0, ThreadProc, p + 3, 0, 0), 
    }; 

    // wait for the threads to finish (join) 
    ::WaitForMultipleObjects(4, t, true, INFINITE); 

    return 0; 
} 

樣本輸出

運行我的機器(雙核),該程序會產生以下的輸出:

Starting thread 1 
Starting thread 2 
Starting thread 4 
Thread 1 is executing iteration #1 (712 delay) 
Starting thread 3 
Thread 2 is executing iteration #1 (798 delay) 
Thread 4 is executing iteration #1 (477 delay) 
Thread 3 is executing iteration #1 (104 delay) 
Thread 3 is synchronizing end of iteration #1 
Thread 3 is waiting on synchronization barrier 
Thread 4 is synchronizing end of iteration #1 
Thread 4 is waiting on synchronization barrier 
Thread 1 is synchronizing end of iteration #1 
Thread 1 is waiting on synchronization barrier 
Thread 2 is synchronizing end of iteration #1 
Thread 2 is the last to arrive, releasing synchronization barrier 
---~~~--- 
Thread 2 is executing iteration #2 (798 delay) 
Thread 3 is executing iteration #2 (104 delay) 
Thread 1 is executing iteration #2 (712 delay) 
Thread 4 is executing iteration #2 (477 delay) 
Thread 3 is synchronizing end of iteration #2 
Thread 3 is waiting on synchronization barrier 
Thread 4 is synchronizing end of iteration #2 
Thread 4 is waiting on synchronization barrier 
Thread 1 is synchronizing end of iteration #2 
Thread 1 is waiting on synchronization barrier 
Thread 2 is synchronizing end of iteration #2 
Thread 2 is the last to arrive, releasing synchronization barrier 
---~~~--- 
Thread 4 is executing iteration #3 (477 delay) 
Thread 3 is executing iteration #3 (104 delay) 
Thread 1 is executing iteration #3 (712 delay) 
Thread 2 is executing iteration #3 (798 delay) 
Thread 3 is synchronizing end of iteration #3 
Thread 3 is waiting on synchronization barrier 
Thread 4 is synchronizing end of iteration #3 
Thread 4 is waiting on synchronization barrier 
Thread 1 is synchronizing end of iteration #3 
Thread 1 is waiting on synchronization barrier 
Thread 2 is synchronizing end of iteration #3 
Thread 2 is the last to arrive, releasing synchronization barrier 
---~~~--- 
Thread 2 is executing iteration #4 (798 delay) 
Thread 3 is executing iteration #4 (104 delay) 
Thread 1 is executing iteration #4 (712 delay) 
Thread 4 is executing iteration #4 (477 delay) 
Thread 3 is synchronizing end of iteration #4 
Thread 3 is waiting on synchronization barrier 
Thread 4 is synchronizing end of iteration #4 
Thread 4 is waiting on synchronization barrier 
Thread 1 is synchronizing end of iteration #4 
Thread 1 is waiting on synchronization barrier 
Thread 2 is synchronizing end of iteration #4 
Thread 2 is the last to arrive, releasing synchronization barrier 
---~~~--- 
Thread 3 is executing iteration #5 (104 delay) 
Thread 4 is executing iteration #5 (477 delay) 
Thread 1 is executing iteration #5 (712 delay) 
Thread 2 is executing iteration #5 (798 delay) 
Thread 3 is synchronizing end of iteration #5 
Thread 3 is waiting on synchronization barrier 
Thread 4 is synchronizing end of iteration #5 
Thread 4 is waiting on synchronization barrier 
Thread 1 is synchronizing end of iteration #5 
Thread 1 is waiting on synchronization barrier 
Thread 2 is synchronizing end of iteration #5 
Thread 2 is the last to arrive, releasing synchronization barrier 
---~~~--- 
Finishing thread 4 
Finishing thread 3 
Finishing thread 2 
Finishing thread 1 

注意,對於簡單每個線程都有隨機的迭代持續時間,但是該線程的所有迭代將使用相同的隨機持續時間(即,它在迭代之間不會改變)。


它是如何工作的?

解決方案的「核心」在「RandezvousOthers」功能中。該函數將阻塞共享信號量(如果調用此函數的線程不是最後一個調用該函數的線程),或者重置同步結構並解除阻塞共享信號量的所有線程(如果此線程在其上函數被調用是最後一個調用函數)。

2

要使其正常工作,請將CreateEvent的第二個參數設置爲TRUE。這將使事件「手動重置」並阻止Waitxxx重置它。 然後在循環的開始處放置一個ResetEvent

+0

我不希望**'等待...'重置事件嗎?否則線程在第一個週期後不同步,不是? – suszterpatt 2011-02-09 14:25:59

+0

不,因爲第一個返回的Wait會重置事件,阻止其他Wait返回。 – Loghorn 2011-02-09 14:27:43

1

我發現這個SyncTools(下載SyncTools.zip)通過谷歌搜索「障礙同步窗口」。它使用一個CriticalSection和一個事件來實現N個線程的屏障。