2010-03-03 40 views
2

(簡而言之:main()的WaitForSingleObject掛起在下面的程序中)。ReleaseSemaphore不釋放信號量

我正在嘗試寫一段代碼調度線程並等待它們在恢復之前完成。我不是每次都創建線程,而是花費很大,所以我讓他們睡着了。主線程以CREATE_SUSPENDED狀態創建X個線程。

同步使用X作爲MaximumCount的信號量完成。信號量的計數器被置零,線程被調度。在他們進入睡眠狀態之前,Thred會執行一些愚蠢的循環並調用ReleaseSemaphore。然後,主線程使用WaitForSingleObject X來確保每個線程完成其工作並正在休眠。然後它循環並再次完成。

程序不時退出。當我扼殺程序,我可以看到WaitForSingleObject掛起。這意味着線程的ReleaseSemaphore不起作用。沒有什麼是printf'ed所以沒有出錯。

也許兩個線程不應調用ReleaseSemaphore在精確的同一時間,但是這將抵消信號量的目的...

我只是不神交它...

其他解決方案同步線程被感激地接受!

#define TRY 100 
#define LOOP 100 

HANDLE *ids; 
HANDLE semaphore; 

DWORD WINAPI Count(__in LPVOID lpParameter) 
{ 
float x = 1.0f; 
while(1) 
{ 
    for (int i=1 ; i<LOOP ; i++) 
    x = sqrt((float)i*x); 
    while (ReleaseSemaphore(semaphore,1,NULL) == FALSE) 
    printf(" ReleaseSemaphore error : %d ", GetLastError()); 
    SuspendThread(ids[(int) lpParameter]); 
} 
return (DWORD)(int)x; 
} 

int main() 
{ 
SYSTEM_INFO sysinfo; 
GetSystemInfo(&sysinfo); 
int numCPU = sysinfo.dwNumberOfProcessors; 

semaphore = CreateSemaphore(NULL, numCPU, numCPU, NULL); 
ids = new HANDLE[numCPU]; 

for (int j=0 ; j<numCPU ; j++) 
    ids[j] = CreateThread(NULL, 0, Count, (LPVOID)j, CREATE_SUSPENDED, NULL); 

for (int j=0 ; j<TRY ; j++) 
{ 
    for (int i=0 ; i<numCPU ; i++) 
    { 
    if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT) 
    printf("Timed out !!!\n"); 
    ResumeThread(ids[i]); 
    } 
    for (int i=0 ; i<numCPU ; i++) 
    WaitForSingleObject(semaphore,INFINITE); 
    ReleaseSemaphore(semaphore,numCPU,NULL); 
} 
CloseHandle(semaphore); 
printf("Done\n"); 
getc(stdin); 
} 

回答

3

在下列情況下,問題發生:

主線程恢復工作線程:

for (int i=0 ; i<numCPU ; i++) 
    { 
    if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT) 
    printf("Timed out !!!\n"); 
    ResumeThread(ids[i]); 
    } 

工作者線程做他們的工作,並釋放信號:

for (int i=1 ; i<LOOP ; i++) 
    x = sqrt((float)i*x); 
    while (ReleaseSemaphore(semaphore,1,NULL) == FALSE) 

所有工作線程的主線程等待和重置信號:

for (int i=0 ; i<numCPU ; i++) 
    WaitForSingleObject(semaphore,INFINITE); 
    ReleaseSemaphore(semaphore,numCPU,NULL); 

主線程進入下一輪,試圖恢復工作線程(請注意,工作線程沒有事件暫停自己呢!這是問題開始的地方......你試圖恢復不一定被暫停尚)主題:

for (int i=0 ; i<numCPU ; i++) 
    { 
    if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT) 
    printf("Timed out !!!\n"); 
    ResumeThread(ids[i]); 
    } 

終於工作者線程掛起自己(雖然他們應該已經開始下一輪):

SuspendThread(ids[(int) lpParameter]); 

因爲所有的工人正在懸掛在主線程永遠等待:

for (int i=0 ; i<numCPU ; i++) 
    WaitForSingleObject(semaphore,INFINITE); 

這裏有一個鏈接,顯示瞭如何正確地解決生產者/消費者問題:

http://en.wikipedia.org/wiki/Producer-consumer_problem

也是我認爲critical sections比信號量和互斥體快得多。他們在大多數情況下也更容易理解(imo)。

+0

不錯的理論/解釋。在恢復線程之前將主要進入睡眠狀態(1)似乎解決了這個問題,但隨後性能消失了。至少這證實了這個理論:主要的恢復線程還沒有睡覺\ o / – Gabriel 2010-03-03 22:57:04

3

我不理解的代碼,但是線程同步肯定是不好的。您假定線程將按特定順序調用SuspendThread()。一個成功的WaitForSingleObject()調用不會告訴你線程調用ReleaseSemaphore()。因此,您將在未掛起的線程上調用ReleaseThread()。這很快就使程序陷入僵局。

另一個不好的假設是一個線程在WFSO返回後已經調用了SuspendThread。通常是的,並不總是。該線程可以在RS呼叫之後立即被搶佔。您將再次在未掛起的線程上調用ReleaseThread()。這通常需要一天左右的時間來鎖定你的程序。

我覺得有一個ReleaseSemaphore調用太多了。毫無疑問,試圖解開它。

你不能用Suspend/ReleaseThread()來控制線程,不要嘗試。

+0

好吧,我不承擔任何SuspendThread順序。最內層的循環實際上是以任意順序恢復線程,但隨後我調用WaitForSingleObject的numCPU次數,這將等待numCPU ReleaseSemaphore(),這可能以任何順序發生。 – Gabriel 2010-03-03 22:26:15

+0

@ Gabriel:我不認爲你會解決這個問題,直到你看到在未掛起的線程上調用ReleaseThread是你的主要問題。 – 2010-03-04 00:30:55

+0

我沒能理解你的第二段。現在我明白了,這與stmax'一樣。感謝您的解釋! – Gabriel 2010-03-04 13:04:27

4

而不是使用信號量(至少直接)或主要顯式喚醒線程來完成一些工作,我總是使用線程安全隊列。當main想要一個工作線程執行某些操作時,它會將該工作的描述推送到隊列中。工作線程每個只做一個工作,然後嘗試從隊列中彈出另一個工作,並最終暫停,直到隊列中有工作要做:

隊列的代碼如下所示:

#ifndef QUEUE_H_INCLUDED 
#define QUEUE_H_INCLUDED 

#include <windows.h> 

template<class T, unsigned max = 256> 
class queue { 
    HANDLE space_avail; // at least one slot empty 
    HANDLE data_avail; // at least one slot full 
    CRITICAL_SECTION mutex; // protect buffer, in_pos, out_pos 

    T buffer[max]; 
    long in_pos, out_pos; 
public: 
    queue() : in_pos(0), out_pos(0) { 
     space_avail = CreateSemaphore(NULL, max, max, NULL); 
     data_avail = CreateSemaphore(NULL, 0, max, NULL); 
     InitializeCriticalSection(&mutex); 
    } 

    void push(T data) { 
     WaitForSingleObject(space_avail, INFINITE);  
     EnterCriticalSection(&mutex); 
     buffer[in_pos] = data; 
     in_pos = (in_pos + 1) % max; 
     LeaveCriticalSection(&mutex); 
     ReleaseSemaphore(data_avail, 1, NULL); 
    } 

    T pop() { 
     WaitForSingleObject(data_avail,INFINITE); 
     EnterCriticalSection(&mutex); 
     T retval = buffer[out_pos]; 
     out_pos = (out_pos + 1) % max; 
     LeaveCriticalSection(&mutex); 
     ReleaseSemaphore(space_avail, 1, NULL); 
     return retval; 
    } 

    ~queue() { 
     DeleteCriticalSection(&mutex); 
     CloseHandle(data_avail); 
     CloseHandle(space_avail); 
    } 
}; 

#endif 

在線程中使用它的大致等價代碼看起來像這樣。我沒有完全理清你的線程函數在做什麼,但是它是用平方根求和的東西,顯然你對線程同步更感興趣,而不是線程實際做的那一刻。

編輯:(基於評論): 如果您需要main()等待一些任務來完成,做更多的工作,然後分配更多的任務,它通常是最好的處理,通過把一個事件(例如)成每個任務,並讓你的線程函數設置事件。修改後的代碼要做到這一點應該是這樣的(注意,隊列代碼不受影響):

#include "queue.hpp" 

#include <iostream> 
#include <process.h> 
#include <math.h> 
#include <vector> 

struct task { 
    int val; 
    HANDLE e; 

    task() : e(CreateEvent(NULL, 0, 0, NULL)) { } 
    task(int i) : val(i), e(CreateEvent(NULL, 0, 0, NULL)) {} 
}; 

void process(void *p) { 
    queue<task> &q = *static_cast<queue<task> *>(p); 

    task t; 
    while (-1 != (t=q.pop()).val) { 
     std::cout << t.val << "\n"; 
     SetEvent(t.e); 
    } 
} 

int main() { 
    queue<task> jobs; 

    enum { thread_count = 4 }; 
    enum { task_count = 10 }; 

    std::vector<HANDLE> threads; 
    std::vector<HANDLE> events; 

    std::cout << "Creating thread pool" << std::endl; 
    for (int t=0; t<thread_count; ++t) 
     threads.push_back((HANDLE)_beginthread(process, 0, &jobs)); 
    std::cout << "Thread pool Waiting" << std::endl; 

    std::cout << "First round of tasks" << std::endl; 

    for (int i=0; i<task_count; ++i) { 
     task t(i+1); 
     events.push_back(t.e); 
     jobs.push(t); 
    } 

    WaitForMultipleObjects(events.size(), &events[0], TRUE, INFINITE); 

    events.clear(); 

    std::cout << "Second round of tasks" << std::endl; 

    for (int i=0; i<task_count; ++i) { 
     task t(i+20); 
     events.push_back(t.e); 
     jobs.push(t); 
    } 

    WaitForMultipleObjects(events.size(), &events[0], true, INFINITE); 
    events.clear(); 

    for (int j=0; j<thread_count; ++j) 
     jobs.push(-1); 

    WaitForMultipleObjects(threads.size(), &threads[0], TRUE, INFINITE); 

    return 0; 
} 
+0

據我瞭解你的代碼,同步發生在最後一個WaitForMultipleObjects,當你等待所有的線程死亡。我不能讓他們死。代碼處於無限循環中,需要線程休眠一段時間,然後進行更多工作。 我需要確保所有的工作都由主線程前的線程完成,並在返回到那些線程之前執行其他的工作,併爲他們提供更多的工作。我不能重新創建新線程,這太慢了,即使使用_beginthreads()。 – Gabriel 2010-03-03 22:42:48

+0

不錯的解決方案。然而,一個觀察結果是線程和事件中的所有句柄都會泄漏。他們需要用CloseHandle清理清理。 – 2015-10-06 13:19:08

0

問題是,您比信號發送更頻繁地等待。

for (int j=0 ; j<TRY ; j++)的循環等待八倍信號量,而四個線程將只有信號每一次和環路本身信號一次。第一次通過循環,這不是問題,因爲信號量的初始值爲4。第二次和以後的每一次,你都在等待太多的信號。這可以通過在前四個等待時間限制時間並且不重試錯誤的事實來緩解。所以有時候它可能會奏效,有時候你會等待。

我認爲以下(未經測試)的更改將有所幫助。

信號量初始化爲零計數:

semaphore = CreateSemaphore(NULL, 0, numCPU, NULL); 

擺脫線程恢復環路的等待(即刪除以下):

if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT) 
     printf("Timed out !!!\n"); 

從試循環結束卸下外來信號(即除去以下內容):

ReleaseSemaphore(semaphore,numCPU,NULL); 
0

這裏是一個實用的解決方案。

我希望我的主程序使用線程(然後使用多個核心)來完成作業並等待所有線程完成,然後再繼續執行其他任務。我不想讓線程死亡並創建新線程,因爲這很慢。在我的問題中,我試圖通過暫停線程來做到這一點,這看起來很自然。但正如nobugz指出的,「你可以用Suspend/ReleaseThread()來控制線程。」

解決方案涉及像我用來控制線程的信號量。實際上一個信號量被用來控制主線程。現在我每個線程都有一個信號來控制線程和一個信號來控制主線程。

這裏是解決方案:

#include <windows.h> 
#include <stdio.h> 
#include <math.h> 
#include <process.h> 

#define TRY 500000 
#define LOOP 100 

HANDLE *ids; 
HANDLE *semaphores; 
HANDLE allThreadsSemaphore; 

DWORD WINAPI Count(__in LPVOID lpParameter) 
{ 
    float x = 1.0f;   
    while(1) 
    { 
     WaitForSingleObject(semaphores[(int)lpParameter],INFINITE); 
     for (int i=1 ; i<LOOP ; i++) 
      x = sqrt((float)i*x+rand()); 
     ReleaseSemaphore(allThreadsSemaphore,1,NULL); 
    } 
    return (DWORD)(int)x; 
} 

int main() 
{ 
    SYSTEM_INFO sysinfo; 
    GetSystemInfo(&sysinfo); 
    int numCPU = sysinfo.dwNumberOfProcessors; 

    ids = new HANDLE[numCPU]; 
    semaphores = new HANDLE[numCPU]; 

    for (int j=0 ; j<numCPU ; j++) 
    { 
     ids[j] = CreateThread(NULL, 0, Count, (LPVOID)j, NULL, NULL); 
     // Threads blocked until main releases them one by one 
     semaphores[j] = CreateSemaphore(NULL, 0, 1, NULL); 
    } 
    // Blocks main until threads finish 
    allThreadsSemaphore = CreateSemaphore(NULL, 0, numCPU, NULL); 

    for (int j=0 ; j<TRY ; j++) 
    { 
     for (int i=0 ; i<numCPU ; i++) // Let numCPU threads do their jobs 
      ReleaseSemaphore(semaphores[i],1,NULL); 
     for (int i=0 ; i<numCPU ; i++) // wait for numCPU threads to finish 
      WaitForSingleObject(allThreadsSemaphore,INFINITE); 
    } 
    for (int j=0 ; j<numCPU ; j++) 
     CloseHandle(semaphores[j]); 
    CloseHandle(allThreadsSemaphore); 
    printf("Done\n"); 
    getc(stdin); 
}