通常,事件隊列被實現爲command design pattern:
在面向對象的編程中,該命令圖案是設計 圖案,其中一個目的是用來表示和封裝所有 調用所需的信息稍後的方法。此信息包括方法名稱,擁有方法 的對象以及方法參數的值。
在C++中,那些擁有方法參數的方法和值的對象是零元函子(即仿函數,它沒有參數)。它可以使用boost::bind()
或C++11 lambdas創建幷包裝成boost::function
。
這是一個極簡主義的例子,介紹如何在多個生產者和多個消費者線程之間實現一個事件隊列。用法:
void consumer_thread_function(EventQueue::Ptr event_queue)
try {
for(;;) {
EventQueue::Event event(event_queue->consume()); // get a new event
event(); // and invoke it
}
}
catch(EventQueue::Stopped&) {
}
void some_work(int n) {
std::cout << "thread " << boost::this_thread::get_id() << " : " << n << '\n';
boost::this_thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(500));
}
int main()
{
some_work(1);
// create an event queue that can be shared between multiple produces and multiple consumers
EventQueue::Ptr queue(new EventQueue);
// create two worker thread and pass them a pointer to queue
boost::thread worker_thread_1(consumer_thread_function, queue);
boost::thread worker_thread_2(consumer_thread_function, queue);
// tell the worker threads to do something
queue->produce(boost::bind(some_work, 2));
queue->produce(boost::bind(some_work, 3));
queue->produce(boost::bind(some_work, 4));
// tell the queue to stop
queue->stop(true);
// wait till the workers thread stopped
worker_thread_2.join();
worker_thread_1.join();
some_work(5);
}
輸出:
./test
thread 0xa08030 : 1
thread 0xa08d40 : 2
thread 0xa08fc0 : 3
thread 0xa08d40 : 4
thread 0xa08030 : 5
實現:
#include <boost/function.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/condition.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include <boost/smart_ptr/detail/atomic_count.hpp>
#include <iostream>
class EventQueue
{
public:
typedef boost::intrusive_ptr<EventQueue> Ptr;
typedef boost::function<void()> Event; // nullary functor
struct Stopped {};
EventQueue()
: state_(STATE_READY)
, ref_count_(0)
{}
void produce(Event event) {
boost::mutex::scoped_lock lock(mtx_);
assert(STATE_READY == state_);
q_.push_back(event);
cnd_.notify_one();
}
Event consume() {
boost::mutex::scoped_lock lock(mtx_);
while(STATE_READY == state_ && q_.empty())
cnd_.wait(lock);
if(!q_.empty()) {
Event event(q_.front());
q_.pop_front();
return event;
}
// The queue has been stopped. Notify the waiting thread blocked in
// EventQueue::stop(true) (if any) that the queue is empty now.
cnd_.notify_all();
throw Stopped();
}
void stop(bool wait_completion) {
boost::mutex::scoped_lock lock(mtx_);
state_ = STATE_STOPPED;
cnd_.notify_all();
if(wait_completion) {
// Wait till all events have been consumed.
while(!q_.empty())
cnd_.wait(lock);
}
else {
// Cancel all pending events.
q_.clear();
}
}
private:
// Disable construction on the stack. Because the event queue can be shared between multiple
// producers and multiple consumers it must not be destroyed before the last reference to it
// is released. This is best done through using a thread-safe smart pointer with shared
// ownership semantics. Hence EventQueue must be allocated on the heap and held through
// smart pointer EventQueue::Ptr.
~EventQueue() {
this->stop(false);
}
friend void intrusive_ptr_add_ref(EventQueue* p) {
++p->ref_count_;
}
friend void intrusive_ptr_release(EventQueue* p) {
if(!--p->ref_count_)
delete p;
}
enum State {
STATE_READY,
STATE_STOPPED,
};
typedef std::list<Event> Queue;
boost::mutex mtx_;
boost::condition_variable cnd_;
Queue q_;
State state_;
boost::detail::atomic_count ref_count_;
};
C++語言並沒有真正有這種事情的原生支持。你需要使用API來處理你正在工作的任何操作系統。 – 2012-03-14 22:43:32
通常現代C++使用信號和插槽(請參閱[Boost.Signals2](http://www.boost.org/libs/signals2/)),而不是事件的消息傳遞。你所展示的方法已經過時了,所以C++沒有什麼特別的東西可以作爲支持它的語言。 – ildjarn 2012-03-14 22:51:00
做一些搜索BlockingQueue。處理程序將阻塞隊列get(),直到事件發佈到隊列。 – Java42 2012-03-14 22:51:53