隊列具有FIFO語義。沒有辦法過濾,除非您想要只丟棄與您的條件不符的元素。 (只是包裹pop
功能)
的典型解決方案是使用
- 一個優先級隊列
- 兩個單獨的隊列,一個用於數據,一個用於命令。
如果數據隊列中的元素永遠不會超過x,請考慮使用循環緩衝區。 spsc_queue
使用環形緩衝區作爲底層存儲。
UPDATE作爲對question edit的響應,我決定使用每個ID的過濾狀態的帶外信令創建一個演示。
讓我穿行,開始與通常的定義:
static constexpr uint8_t NUM_SOURCES = 32;
現在,由constumer和生產者雙方共享的公共定義:
namespace queueing {
using data_t = std::vector<char>; // just for demo
struct spsc_queue_item {
uint8_t ID;
data_t data;
};
// process control
boost::atomic_bool shutdown_flag { false };
namespace statistics {
namespace events {
boost::atomic_size_t occurred { 0 };
}
namespace packets {
boost::atomic_size_t queued { 0 };
boost::atomic_size_t dropped { 0 };
boost::atomic_size_t processed { 0 };
boost::atomic_size_t skipped[NUM_SOURCES] = {};
}
boost::atomic_size_t idle_cycles { 0 };
void report();
}
// business logic
boost::atomic_bool source_enabled [NUM_SOURCES] = {}; // true:started (process)/false:stopped (skip)
boost::lockfree::spsc_queue<spsc_queue_item, boost::lockfree::capacity<2048> > shared_queue;
}
正如你可以看到我變化的數據(因爲沒有void*
就更容易演示)。另外,我在休息運行結束時添加了一些有用的統計數據,可以是report()
-ed。
void producer_thread() {
using namespace boost;
namespace stats = queueing::statistics;
// helpers to generate random data packets or start/stop filter events
enum kind_t { knd_data, knd_start, knd_stop };
queueing::data_t const empty {};
struct event_t { kind_t kind; spsc_queue_item item; };
// ...
// now generate queue items in a loop
while (!queueing::shutdown_flag) {
auto evt = gen_event();
std::this_thread::sleep_for(std::chrono::nanoseconds(engine()%102400));
switch(evt.kind) {
case knd_data:
stats::events::occurred++;
if (queueing::shared_queue.push(evt.item)) {
stats::packets::queued++;
} else {
stats::packets::dropped++;
}
break;
case knd_start: {
bool expected = false;
if (queueing::source_enabled[evt.item.ID].compare_exchange_weak(expected, true))
std::cout << "+";// << static_cast<int>(evt.item.ID);
}
break;
case knd_stop: {
bool expected = true;
if (queueing::source_enabled[evt.item.ID].compare_exchange_weak(expected, false))
std::cout << "-";// << static_cast<int>(evt.item.ID);
}
break;
}
}
}
線程函數的主體是相當直接的,但一個值得關注的事情是,start
和stop
事件沒有傳達過的隊列。
生產者更簡單。它所做的只是耗盡隊列,更新一些統計計數器。 處理物品之前,相應的過濾狀態(source_enabled
)被檢查:
int main() {
using namespace std;
// check no source_enabled flags are set at start
assert(0 == count(begin(queueing::source_enabled), end(queueing::source_enabled), true));
auto producer = thread(producer_thread);
auto consumer = thread(consumer_thread);
this_thread::sleep_for(chrono::seconds(1));
queueing::shutdown_flag = true;
if (producer.joinable()) producer.join();
if (consumer.joinable()) consumer.join();
queueing::statistics::report();
}
我們程序同時運行:
void consumer_thread() {
namespace stats = queueing::statistics;
queueing::spsc_queue_item item;
auto consume_pending = [&] {
while (queueing::shared_queue.pop(item)) {
if (queueing::source_enabled[item.ID])
fake_process(item); // if filtering started, process
else
stats::packets::skipped[item.ID]++; // if filtering stopped, skip
}
};
while (!queueing::shutdown_flag) {
consume_pending();
stats::idle_cycles++;
}
consume_pending(); // drain any remaining queued items, to avoid race with shutdown_flag
}
現在,一切都應該是不言自明的是,拼湊main()
功能線程約1秒鐘,然後等待它們加入。 然後它會報告統計數字,我的系統上,看起來像:
++-+++++++--+++-++++-++-+++---+-+-+-+++++-+--+---+++-++---+-++-++-+-+++---++--+++-++---+----+-+-+-+--+++-++--+--+--++--+-+-+-+--+--+++--++-+-++-++-+--+--+++-++-+---+----++-+++-+-++-+----+--+-+-+--+++--+++++-+-+--++-+--++++-+-+---++-+---+-+--++---++++----+-+---+-+-+-+--+-++--+-+++--+++-+----+-+-+-+++-+++--+-++-++++++---++--+-++-++---+-+-++--+-+-----++---+-+-+--+++--++---++--+-+++-++++-+++-+-+--+++-+-+----+-++++-+--+++----+++-------+-++-+-+-++++-++++---++-+---+-++-----+-++++----+++-++++--+--+-----+-++++----++++-+++-+---+---+-+-++++-++---+-++-+-+-+++-+-+--+-----++-+++---+-++---+++-++-+--+++++------++---+-++++-+-+-+--++++-++++-+--+++-++---+-----++-+-++-+-+++--++-+-+-++-++-----+-++--+--+--+-------++--+-++-+--++-++-++--+-+-++-+-+++-++++-+---+--+++--++--+-+++++-+-----++--++--+++--++-+---++----+--+-+--++-++---+++++++-+--+-++---+----+-+-+--+-+-+--++++-++--+--+-+---+++-+++++++-++-+-----+--++------+-++++++--++-++-+---+-++---++-++------+-++--+-++-+++--+++-+++-+-+--+-+--+--+---+-+-+-+--+-++-+-++---+++-+-+-++--+-++-+---++--+-+--++-+++-+--+++---+----+--++-++++++-++-+----+++-+-+--+++-----+---+--++-+--+-++++++-+-+++--+++---+-+-++++-++-+-+----++++----+++-++----+---++-+---++-+-+-++--+++---+--+++----++-++-+++--+--+---+++--+--+--+--+--++++-++++---+-+-+--+-+-+--++++--+-+--++--++++----++-++++++-+--+-+------+-----+++----++-+++++-+--+--+---++-+-++-+--++++-+++---+++-+----+--+++++-+-+--+++--++-+++-+-++---++-++-+-+-+--+-++--+---+-+++--+++++-----+-++-+-+++-+-+-------++++---+-+-++-+--+++++---+--++-+-++-+++----+++-++++---++------+-+---++++--+-+---+++------++++++---++-+----+-+++-+--++-+-+-+-----+-++-++-++--++-+-+-++++++--++---+-+-+-+-+-+-++-++-++----++--++-+++-++---+++--+++---+++--+-+++----++--+-+-+++---++---++-+--+++++-+---++----++--+++-+--+-+++++++-+--+---+--+---+----+-++-++-+--++--+--++-++---+++++--+-+---+-++-+-+----+++-++-+-+--+---+-++-+-----++---++++--+++++-+---+-++--+-+-+----+--++++-+-----++++--++-+-+++++----+++---+++++++--+---+--+--++++--+++-----+-++--+-+-----+++++----+-++++---+-++--+-++-+++--+++-+-+++++--+----++--+--+-+-++-----++-+--++--++++++-+-+++----++++---++-+--+-+------+-+--+++++--+++--++-----+--++-++-+++++-++-------+----++-++--+--++--++++-++---+-+++++----+-++-++---+++---+-++-++----++--++--+++++-+--+-----+-+-+-+-+++-+--++-+-+++--+-+-+++-+-++--+-+-+-+--+-+-+++++---+---+-++-+---++-+-++-+-+++-++-++-+-++-------++---+-++-++++-++--++--+-++-+++---++++--+----+---+-++-+++--+-+++---+-++-++----+--+--+-++--+-++-++++++--+-++-+--+---+-+--+-+--++---+--+-++--++--+--++-++++----+--+--+++-+++-+-+-++--++-+-+---+-+-------+--++++++-++++++-++-+-++-+---+--+-+-++--+++---+----+--+--+-++----+-+-++-++-++-+++--++---++-------+++++--+-+++++++--+--+-+--++--++--++-+--+--+++----+++++-++-------++---+-+--++-++--+++-+-+-+-+------+-+--+++++-+-+--++-++-++--+++++++---+-++--+++-+++--++++-++--+-+---+----+----+---+--+-+++-+-+++++---+--++--+-+++-+++++--+---+-+++++-+---++++--+-++----+---++----+++---+++++-+-++--+--+-++-++----+---++-++-+-+-+---+++-++-+++-+---+++--+-+-----++-+---++-+---++---+-++--++++-+--++-+-++----+-+-+--++--++++--+--++--+--+-+-+++++++--++-+-+-+++--+---+++--++++++--+-+-----+---++-+++--+++--++---+++--+--+-++++-----+++-----++++--++--+-+--
Events occurred: 3061
Queued packets: 3061
Dropped packets: 0
Processed packets: 1464
Filtered (per source) 58 48 53 51 47 39 45 42 53 52 57 50 63 43 49 57 45 58 40 42 56 54 58 52 44 53 61 41 50 33 51 52
Total filtered: 1597
Idle cycles: 26408166
第一行(++-+++++++--+++-++++-++-+++---+
...)是表示source_enabled[]
標誌有效變化數的速記符號。
你可以看到,在這個速度下,隊列不飽和,消費者線程有很多空閒週期。
完全參考上市:你用這個爲
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/atomic.hpp>
#include <boost/random.hpp>
#include <boost/bind.hpp>
#include <thread>
static constexpr uint8_t NUM_SOURCES = 32;
namespace queueing {
using data_t = std::vector<char>; // just for demo
struct spsc_queue_item {
uint8_t ID;
data_t data;
};
// process control
boost::atomic_bool shutdown_flag { false };
namespace statistics {
namespace events {
boost::atomic_size_t occurred { 0 };
}
namespace packets {
boost::atomic_size_t queued { 0 };
boost::atomic_size_t dropped { 0 };
boost::atomic_size_t processed { 0 };
boost::atomic_size_t skipped[NUM_SOURCES] = {};
}
boost::atomic_size_t idle_cycles { 0 };
void report() {
namespace stats = queueing::statistics;
std::cout << "\n";
std::cout << "Events occurred: " << stats::events::occurred << "\n";
std::cout << "Queued packets: " << stats::packets::queued << "\n";
std::cout << "Dropped packets: " << stats::packets::dropped << "\n";
std::cout << "Processed packets: " << stats::packets::processed << "\n";
std::cout << "Filtered (per source) ";
std::copy(std::begin(stats::packets::skipped), std::end(stats::packets::skipped),
std::ostream_iterator<size_t>(std::cout, " "));
std::cout << "\n";
auto total_filtered = std::accumulate(std::begin(stats::packets::skipped), std::end(stats::packets::skipped), 0ul);
std::cout << "Total filtered: " << total_filtered << "\n";
std::cout << "Idle cycles: " << stats::idle_cycles << "\n";
}
}
// business logic
boost::atomic_bool source_enabled [NUM_SOURCES] = {}; // true:started (process)/false:stopped (skip)
boost::lockfree::spsc_queue<spsc_queue_item, boost::lockfree::capacity<2048> > shared_queue;
}
void producer_thread() {
using namespace boost;
namespace stats = queueing::statistics;
// generate random data packets or start/stop filter events
using queueing::spsc_queue_item;
mt19937 engine;
auto gen_srce = bind(uniform_int<uint8_t>(0, NUM_SOURCES-1), ref(engine));
auto gen_data = [&] {
std::vector<char> v;
std::generate_n(back_inserter(v), engine()%1024, bind(uniform_int<uint8_t>{}, ref(engine)));
return v;
};
enum kind_t { knd_data, knd_start, knd_stop };
auto gen_kind = bind(uniform_int<uint8_t>(knd_data, knd_stop), ref(engine));
queueing::data_t const empty {};
//
struct event_t { kind_t kind; spsc_queue_item item; };
auto gen_event = [&] {
auto kind = static_cast<kind_t>(gen_kind());
return event_t {
kind,
spsc_queue_item {
gen_srce(),
kind == knd_data? gen_data() : empty
}
};
};
// now that we can easily generate queue items, let's do so in a loop
while (!queueing::shutdown_flag) {
auto evt = gen_event();
std::this_thread::sleep_for(std::chrono::nanoseconds(engine()%102400));
switch(evt.kind) {
case knd_data:
stats::events::occurred++;
if (queueing::shared_queue.push(evt.item)) {
stats::packets::queued++;
} else {
stats::packets::dropped++;
}
break;
case knd_start:
{
bool expected = false;
if (queueing::source_enabled[evt.item.ID].compare_exchange_weak(expected, true))
std::cout << "+";// << static_cast<int>(evt.item.ID);
}
break;
case knd_stop:
{
bool expected = true;
if (queueing::source_enabled[evt.item.ID].compare_exchange_weak(expected, false))
std::cout << "-";// << static_cast<int>(evt.item.ID);
}
break;
}
}
}
void fake_process(queueing::spsc_queue_item const& item) {
// pretend it takes time proportional to the amount of data
std::this_thread::sleep_for(std::chrono::microseconds(item.data.size()));
queueing::statistics::packets::processed++;
}
void consumer_thread() {
namespace stats = queueing::statistics;
queueing::spsc_queue_item item;
auto consume_pending = [&] {
while (queueing::shared_queue.pop(item)) {
if (queueing::source_enabled[item.ID])
fake_process(item); // if filtering started, process
else
stats::packets::skipped[item.ID]++; // if filtering stopped, skip
}
};
while (!queueing::shutdown_flag) {
consume_pending();
stats::idle_cycles++;
}
consume_pending(); // drain any remaining queued items, to avoid race with shutdown_flag
}
#include <cassert>
int main() {
using namespace std;
// check no source_enabled flags are set at start
assert(0 == count(begin(queueing::source_enabled), end(queueing::source_enabled), true));
auto producer = thread(producer_thread);
auto consumer = thread(consumer_thread);
this_thread::sleep_for(chrono::seconds(1));
queueing::shutdown_flag = true;
if (producer.joinable()) producer.join();
if (consumer.joinable()) consumer.join();
queueing::statistics::report();
}
?如果您正在複用異步網絡事件,請考慮使用Boost Asio。它具有全部隊列和調度邏輯 – sehe