2011-09-09 63 views
6

我的程序設置如下:
有一個線程安全的隊列類,一個線程將數據放在它上面而坐在一個無限循環中,另一個線程彈出數據它坐在無限循環中。我試圖想出一種方法來使用Windows事件或其他機制來製作thread_1(下圖),等待無限while循環,並且只在隊列深度大於或等於1時迭代。windows C++線程正在等待隊列數據推送

class thread-safe_Queue 
{ 
public: 
    push(); 
    pop(); 
}; 

DWORD thread_1() 
{ 
while(1) 
{ 
    // wait for thread-safe queue to have data on it 
    // pop data off 
    // process data 
} 
} 

DWORD thread_2() 
{ 
while(1) 
{ 
    // when data becomes available, push data onto thread-safe queue 
} 
} 

回答

0

這個怎麼樣(我假設你熟悉事件機制)。

1.

thread_safe_Queue::push(something) 
{ 
// lock the queue 
... 
// push object 
// Signal the event 
SetEvent(notification); 

// unlock the queue 
} 

2.

thread_safe_Queue::pop(something) 
{ 
WaitForSingleObject(notification); 
// lock the queue 
... 
// get object 
// reset the event 
if (queue is empty) 
    ResetEvent(notification); 

// unlock the queue 
} 

3. thread_1只是試圖彈出對象和處理它。由於推送事件,事件已啓用,因此可以成功調用pop。否則它會在pop內等待。實際上,您可以使用其他同步對象,如互斥體或臨界區域,而不是此例中的事件。

UPDATE。外部事件: 線程1:

void thread_1() 
    { 
    while(1) 
    { 
    WaitForSingleObject(notification); 
    if (!pop(object)) // pop should return if there are any objects left in queue 
     SetEvent(notification);  
    } 
    } 

thread_2

void thread_2() 
    { 
    while(1) 
    { 
    // push the object and than signal event 
    ResetEvent(notification) 
    } 
    } 
+0

我寧願將事件放在類之外,並且在線程入口函數內部。原因是線程還等待着第二個事件。那就是當用戶想要結束程序,並因此結束無限循環時。當發生這種情況時,用戶將發送一個關閉程序的命令,推送線程將停止監聽數據並關閉,並且推送線程將停止等待線程在其中存儲數據,並且也將關閉。 – rossb83

+0

你可以對外部事件做同樣的事情。我更新了上面的答案。 – Werolik

+0

你是不是指thread1調用reset並且線程2在外部版本中調用set?另外,你在這種情況下如何避免死鎖:1. thread1無法彈出。 2.線程2調用集。 3. thread1調用重置。 – Nir

0

您可以使用命名的事件。每個線程都會調用以相同名稱傳遞的CreateEvent。然後使用WaitForMultipleObjects等待隊列相關事件或結束程序事件。彈出線程將等待queue_has_data和end_program事件。推送線程將等待data_available和end_program事件,並在將某些事物放入隊列時設置queue_has_data事件。

2

我認爲這可能會訣竅。派生類Event並重載Process()函數。

#include <process.h> // Along with all the normal windows includes 

//********************************************* 
using namespace os; 

Mutex globalQueueMutex; 

class QueueReader : public Event 
{ 
public: 
    virtual void Process() 
    { 
     // Lock the queue 
     Locker l(globalQueueMutex); 
     // pop data off 
     // process data 
     return; // queue will automatically unlock 
    } 
}; 

QueueReader myQueueReader; 

//********************************************* 
// The queue writer would have functions like : 
void StartQueueReader() 
{ 
    Thread(QueueReader::StartEventHandler, &myQueueReader); 
} 
void WriteToQueue() 
{ 
    Locker l(globalQueueMutex); 
    // write to the queue 
    myQueueReader.SignalProcess(); // tell reader to wake up 
} 
// When want to shutdown 
void Shutdown() 
{ 
    myQueueReader.SignalShutdown(); 
} 

下面是執行魔法類。

namespace os { 

// ********************************************************************** 
/// Windows implementation to spawn a thread. 
static uintptr_t Thread (void (*StartAddress)(void *), void *ArgList) 
{ 
    return _beginthread(StartAddress, 0, ArgList); 
} 

// ********************************************************************** 
/// Windows implementation of a critical section. 
class Mutex 
{ 
public: 
    // Initialize section on construction 
    Mutex() { InitializeCriticalSection(&cs_); } 
    // Delete section on destruction 
    ~Mutex() { DeleteCriticalSection(&cs_); } 
    // Lock it 
    void lock() { EnterCriticalSection(&cs_); } 
    // Unlock it 
    void unlock() { LeaveCriticalSection(&cs_); } 

private: 
    CRITICAL_SECTION cs_; 
}; // class Mutex 

/// Locks/Unlocks a mutex 
class Locker 
{ 
public: 
    // Lock the mutex on construction 
    Locker(Mutex& mutex): mutex_(mutex) { mutex_.lock(); } 
    // Unlock on destruction 
    ~Locker() { mutex_.unlock(); } 
private: 
    Mutex& mutex_; 
}; // class Locker 

// ********************************************************************** 
// Windows implementation of event handler 
#define ProcessEvent hEvents[0] 
#define SetTimerEvent hEvents[1] 
#define ShutdownEvent hEvents[2] 

/// Windows implementation of events 
class Event 
{ 
    /// Flag set when shutdown is complete 
    bool Shutdown; 
    /// Max time to wait for events 
    DWORD Timer; 
    /// The three events - process, reset timer, and shutdown 
    HANDLE hEvents[3]; 

public: 
    /// Timeout is disabled by default and Events assigned 
    Event(DWORD timer = INFINITE) : Timer(timer) 
    { 
    Shutdown = false; 
    ProcessEvent = CreateEvent(NULL,TRUE,FALSE,NULL); 
    SetTimerEvent = CreateEvent(NULL,TRUE,FALSE,NULL); 
    ShutdownEvent = CreateEvent(NULL,TRUE,FALSE,NULL); 
    } 

    /// Close the event handles 
    virtual ~Event() 
    { 
    CloseHandle(ProcessEvent); 
    CloseHandle(SetTimerEvent); 
    CloseHandle(ShutdownEvent); 
    } 

    /// os::Thread calls this to start the Event handler 
    static void StartEventHandler(void *pMyInstance) 
    { ((Event *)pMyInstance)->EventHandler(); } 
    /// Call here to Change/Reset the timeout timer 
    void ResetTimer(DWORD timer) { Timer = timer; SetEvent(SetTimerEvent); } 
    /// Set the signal to shutdown the worker thread processing events 
    void SignalShutdown() { SetEvent(ShutdownEvent); while (!Shutdown) Sleep(30);} 
    /// Set the signal to run the process 
    void SignalProcess() { SetEvent(ProcessEvent); } 

protected: 
    /// Overload in derived class to process events with worker thread 
    virtual void Process(){} 
    /// Override to process timeout- return true to terminate thread 
    virtual bool Timeout(){ return true;} 

    /// Monitor thread events 
    void EventHandler() 
    { 
    DWORD WaitEvents; 
    while (!Shutdown) 
    { 
     // Wait here, looking to be signaled what to do next 
     WaitEvents = WaitForMultipleObjects(3, hEvents, FALSE, Timer); 

     switch (WaitEvents) 
     { 
     // Process event - process event then reset for the next one 
     case WAIT_OBJECT_0 + 0: 
      Process(); 
      ResetEvent(ProcessEvent); 
      break; 

     // Change timer event - see ResetTimer(DWORD timer) 
     case WAIT_OBJECT_0 + 1: 
      ResetEvent(SetTimerEvent); 
      continue; 

     // Shutdown requested so exit this thread 
     case WAIT_OBJECT_0 + 2: 
      Shutdown = true; 
      break; 

     // Timed out waiting for an event 
     case WAIT_TIMEOUT: 
      Shutdown = Timeout(); 
      break; 

     // Failed - should never happen 
     case WAIT_FAILED: 
      break; 

     default: 
      break; 
     } 
    } 
    } 


}; 

} // namespace os