2016-01-13 46 views
1

我使用boost :: lockfree :: spsc_queue將流數據從線程發送到工作線程。從boost :: lockfree :: spsc_queue過濾項目

這是項目的結構:

struct spsc_queue_item 
{ 
    uint8_t ID; 
    void *data; 
}; 

的數據通過spsc_queue.push得到插入並通過spsc_queue.pop其他線程讀取。

但我也有一些工作線程的「命令」。 像ID 0「開始過濾」, ID 1爲「阻濾波器」, ID 2 ID「數據」 ...

因此,如果大量的「數據」被推到隊列中的命令,如「停止過濾器「將被延遲,因爲首先處理了」數據「項目。 但是,如果命令「停止過濾器」進入「數據」項目是無用的,可以丟棄。

現在我知道還有成員函數「consume_one」和「consume_all」。

但我沒有找到一個例子如何使用這些函數的仿函數。 我的想法是使用consume_one作爲示例,首先檢查ID = 0或ID == 1的項是否在隊列中,然後繼續處理ID == 2的數據項。

有人有一個小例子如何使用仿函數來過濾掉所需的ID相關項目?

或者還有其他快速的方式來通過「優先級」標誌從隊列中獲取項目嗎?

UPDATE針對sehe的回答,一些更多的信息:


感謝您對這一信息。

任何想法如何使它更好?

我需要信號的工作線程像「開始過濾」,「停止過濾器」,...

我想利用事件:

SetEvent(hStartFilter); 

但在這裏我不得不使用,創建和關閉每個命令一個事件。

「數據」也可以具有不同的ID。 像工作者線程接收:

"start filter" with ID=0 
"start filter" with ID=1 

然後「數據」與ID0和ID1是進入隊列。 現在線程收到ID0的「停止過濾器」。 因此,ID0數據隊列上的所有項目都已過時並可以刪除。

我的第一個測試是抓取隊列中的所有項目。檢查每個匹配的ID並刪除該項目。剩下的其他項目在之後被推回隊列。 但是,如果有大量具有不同ID的數據(最多32個),則這會佔用很多CPU和耗時的操作。隊列的最大大小是2048個項目。

這裏還有更好的方法嗎?

+0

?如果您正在複用異步網絡事件,請考慮使用Boost Asio。它具有全部隊列和調度邏輯 – sehe

回答

0

隊列具有FIFO語義。沒有辦法過濾,除非您想要只丟棄與您的條件不符的元素。 (只是包裹pop功能)

的典型解決方案是使用

  • 一個優先級隊列
  • 兩個單獨的隊列,一個用於數據,一個用於命令。

如果數據隊列中的元素永遠不會超過x,請考慮使用循環緩衝區。 spsc_queue使用環形緩衝區作爲底層存儲。

UPDATE作爲對question edit的響應,我決定使用每個ID的過濾狀態的帶外信令創建一個演示。

讓我穿行,開始與通常的定義:

static constexpr uint8_t NUM_SOURCES = 32; 

現在,由constumer和生產者雙方共享的公共定義:

namespace queueing { 
    using data_t = std::vector<char>; // just for demo 

    struct spsc_queue_item { 
     uint8_t ID; 
     data_t data; 
    }; 

    // process control 
    boost::atomic_bool shutdown_flag { false }; 

    namespace statistics { 
     namespace events { 
      boost::atomic_size_t occurred { 0 }; 
     } 

     namespace packets { 
      boost::atomic_size_t queued { 0 }; 
      boost::atomic_size_t dropped { 0 }; 
      boost::atomic_size_t processed { 0 }; 
      boost::atomic_size_t skipped[NUM_SOURCES] = {}; 
     } 

     boost::atomic_size_t idle_cycles { 0 }; 

     void report(); 
    } 

    // business logic 
    boost::atomic_bool source_enabled [NUM_SOURCES] = {}; // true:started (process)/false:stopped (skip) 
    boost::lockfree::spsc_queue<spsc_queue_item, boost::lockfree::capacity<2048> > shared_queue; 
} 

正如你可以看到我變化的數據(因爲沒有void*就更容易演示)。另外,我在休息運行結束時添加了一些有用的統計數據,可以是report() -ed。

void producer_thread() { 
    using namespace boost; 
    namespace stats = queueing::statistics; 
    // helpers to generate random data packets or start/stop filter events 

    enum kind_t { knd_data, knd_start, knd_stop }; 
    queueing::data_t const empty {}; 
    struct event_t { kind_t kind; spsc_queue_item item; }; 
    // ... 

    // now generate queue items in a loop 
    while (!queueing::shutdown_flag) { 
     auto evt = gen_event(); 

     std::this_thread::sleep_for(std::chrono::nanoseconds(engine()%102400)); 

     switch(evt.kind) { 
      case knd_data: 
       stats::events::occurred++; 

       if (queueing::shared_queue.push(evt.item)) { 
        stats::packets::queued++; 
       } else { 
        stats::packets::dropped++; 
       } 
       break; 
      case knd_start: { 
        bool expected = false; 
        if (queueing::source_enabled[evt.item.ID].compare_exchange_weak(expected, true)) 
         std::cout << "+";// << static_cast<int>(evt.item.ID); 
       } 
       break; 
      case knd_stop: { 
        bool expected = true; 
        if (queueing::source_enabled[evt.item.ID].compare_exchange_weak(expected, false)) 
         std::cout << "-";// << static_cast<int>(evt.item.ID); 
       } 
       break; 
     } 
    } 
} 

線程函數的主體是相當直接的,但一個值得關注的事情是,startstop事件沒有傳達過的隊列。

生產者更簡單。它所做的只是耗盡隊列,更新一些統計計數器。 處理物品之前,相應的過濾狀態(source_enabled)被檢查:

int main() { 
    using namespace std; 

    // check no source_enabled flags are set at start 
    assert(0 == count(begin(queueing::source_enabled), end(queueing::source_enabled), true)); 

    auto producer = thread(producer_thread); 
    auto consumer = thread(consumer_thread); 

    this_thread::sleep_for(chrono::seconds(1)); 
    queueing::shutdown_flag = true; 

    if (producer.joinable()) producer.join(); 
    if (consumer.joinable()) consumer.join(); 

    queueing::statistics::report(); 
} 

我們程序同時運行:

void consumer_thread() { 
    namespace stats = queueing::statistics; 

    queueing::spsc_queue_item item; 

    auto consume_pending = [&] { 
     while (queueing::shared_queue.pop(item)) { 
      if (queueing::source_enabled[item.ID]) 
       fake_process(item); // if filtering started, process 
      else 
       stats::packets::skipped[item.ID]++; // if filtering stopped, skip 
     } 
    }; 

    while (!queueing::shutdown_flag) { 
     consume_pending(); 
     stats::idle_cycles++; 
    } 

    consume_pending(); // drain any remaining queued items, to avoid race with shutdown_flag 
} 

現在,一切都應該是不言自明的是,拼湊main()功能線程約1秒鐘,然後等待它們加入。 然後它會報告統計數字,我的系統上,看起來像:

++-+++++++--+++-++++-++-+++---+-+-+-+++++-+--+---+++-++---+-++-++-+-+++---++--+++-++---+----+-+-+-+--+++-++--+--+--++--+-+-+-+--+--+++--++-+-++-++-+--+--+++-++-+---+----++-+++-+-++-+----+--+-+-+--+++--+++++-+-+--++-+--++++-+-+---++-+---+-+--++---++++----+-+---+-+-+-+--+-++--+-+++--+++-+----+-+-+-+++-+++--+-++-++++++---++--+-++-++---+-+-++--+-+-----++---+-+-+--+++--++---++--+-+++-++++-+++-+-+--+++-+-+----+-++++-+--+++----+++-------+-++-+-+-++++-++++---++-+---+-++-----+-++++----+++-++++--+--+-----+-++++----++++-+++-+---+---+-+-++++-++---+-++-+-+-+++-+-+--+-----++-+++---+-++---+++-++-+--+++++------++---+-++++-+-+-+--++++-++++-+--+++-++---+-----++-+-++-+-+++--++-+-+-++-++-----+-++--+--+--+-------++--+-++-+--++-++-++--+-+-++-+-+++-++++-+---+--+++--++--+-+++++-+-----++--++--+++--++-+---++----+--+-+--++-++---+++++++-+--+-++---+----+-+-+--+-+-+--++++-++--+--+-+---+++-+++++++-++-+-----+--++------+-++++++--++-++-+---+-++---++-++------+-++--+-++-+++--+++-+++-+-+--+-+--+--+---+-+-+-+--+-++-+-++---+++-+-+-++--+-++-+---++--+-+--++-+++-+--+++---+----+--++-++++++-++-+----+++-+-+--+++-----+---+--++-+--+-++++++-+-+++--+++---+-+-++++-++-+-+----++++----+++-++----+---++-+---++-+-+-++--+++---+--+++----++-++-+++--+--+---+++--+--+--+--+--++++-++++---+-+-+--+-+-+--++++--+-+--++--++++----++-++++++-+--+-+------+-----+++----++-+++++-+--+--+---++-+-++-+--++++-+++---+++-+----+--+++++-+-+--+++--++-+++-+-++---++-++-+-+-+--+-++--+---+-+++--+++++-----+-++-+-+++-+-+-------++++---+-+-++-+--+++++---+--++-+-++-+++----+++-++++---++------+-+---++++--+-+---+++------++++++---++-+----+-+++-+--++-+-+-+-----+-++-++-++--++-+-+-++++++--++---+-+-+-+-+-+-++-++-++----++--++-+++-++---+++--+++---+++--+-+++----++--+-+-+++---++---++-+--+++++-+---++----++--+++-+--+-+++++++-+--+---+--+---+----+-++-++-+--++--+--++-++---+++++--+-+---+-++-+-+----+++-++-+-+--+---+-++-+-----++---++++--+++++-+---+-++--+-+-+----+--++++-+-----++++--++-+-+++++----+++---+++++++--+---+--+--++++--+++-----+-++--+-+-----+++++----+-++++---+-++--+-++-+++--+++-+-+++++--+----++--+--+-+-++-----++-+--++--++++++-+-+++----++++---++-+--+-+------+-+--+++++--+++--++-----+--++-++-+++++-++-------+----++-++--+--++--++++-++---+-+++++----+-++-++---+++---+-++-++----++--++--+++++-+--+-----+-+-+-+-+++-+--++-+-+++--+-+-+++-+-++--+-+-+-+--+-+-+++++---+---+-++-+---++-+-++-+-+++-++-++-+-++-------++---+-++-++++-++--++--+-++-+++---++++--+----+---+-++-+++--+-+++---+-++-++----+--+--+-++--+-++-++++++--+-++-+--+---+-+--+-+--++---+--+-++--++--+--++-++++----+--+--+++-+++-+-+-++--++-+-+---+-+-------+--++++++-++++++-++-+-++-+---+--+-+-++--+++---+----+--+--+-++----+-+-++-++-++-+++--++---++-------+++++--+-+++++++--+--+-+--++--++--++-+--+--+++----+++++-++-------++---+-+--++-++--+++-+-+-+-+------+-+--+++++-+-+--++-++-++--+++++++---+-++--+++-+++--++++-++--+-+---+----+----+---+--+-+++-+-+++++---+--++--+-+++-+++++--+---+-+++++-+---++++--+-++----+---++----+++---+++++-+-++--+--+-++-++----+---++-++-+-+-+---+++-++-+++-+---+++--+-+-----++-+---++-+---++---+-++--++++-+--++-+-++----+-+-+--++--++++--+--++--+--+-+-+++++++--++-+-+-+++--+---+++--++++++--+-+-----+---++-+++--+++--++---+++--+--+-++++-----+++-----++++--++--+-+-- 
Events occurred: 3061 
Queued packets: 3061 
Dropped packets: 0 
Processed packets: 1464 
Filtered (per source) 58 48 53 51 47 39 45 42 53 52 57 50 63 43 49 57 45 58 40 42 56 54 58 52 44 53 61 41 50 33 51 52 
Total filtered: 1597 
Idle cycles:  26408166 

第一行(++-+++++++--+++-++++-++-+++---+ ...)是表示source_enabled[]標誌有效變化數的速記符號。

你可以看到,在這個速度下,隊列不飽和,消費者線程有很多空閒週期。

演示Live On Coliru

完全參考上市:你用這個爲

#include <boost/lockfree/spsc_queue.hpp> 
#include <boost/atomic.hpp> 
#include <boost/random.hpp> 
#include <boost/bind.hpp> 
#include <thread> 
static constexpr uint8_t NUM_SOURCES = 32; 

namespace queueing { 
    using data_t = std::vector<char>; // just for demo 

    struct spsc_queue_item { 
     uint8_t ID; 
     data_t data; 
    }; 

    // process control 
    boost::atomic_bool shutdown_flag { false }; 

    namespace statistics { 
     namespace events { 
      boost::atomic_size_t occurred { 0 }; 
     } 

     namespace packets { 
      boost::atomic_size_t queued { 0 }; 
      boost::atomic_size_t dropped { 0 }; 
      boost::atomic_size_t processed { 0 }; 
      boost::atomic_size_t skipped[NUM_SOURCES] = {}; 
     } 

     boost::atomic_size_t idle_cycles { 0 }; 

     void report() { 
      namespace stats = queueing::statistics; 
      std::cout << "\n"; 
      std::cout << "Events occurred: " << stats::events::occurred << "\n"; 
      std::cout << "Queued packets: " << stats::packets::queued << "\n"; 
      std::cout << "Dropped packets: " << stats::packets::dropped << "\n"; 
      std::cout << "Processed packets: " << stats::packets::processed << "\n"; 

      std::cout << "Filtered (per source) "; 
      std::copy(std::begin(stats::packets::skipped), std::end(stats::packets::skipped), 
        std::ostream_iterator<size_t>(std::cout, " ")); 
      std::cout << "\n"; 

      auto total_filtered = std::accumulate(std::begin(stats::packets::skipped), std::end(stats::packets::skipped), 0ul); 
      std::cout << "Total filtered: " << total_filtered << "\n"; 
      std::cout << "Idle cycles:  " << stats::idle_cycles  << "\n"; 
     } 
    } 

    // business logic 
    boost::atomic_bool source_enabled [NUM_SOURCES] = {}; // true:started (process)/false:stopped (skip) 
    boost::lockfree::spsc_queue<spsc_queue_item, boost::lockfree::capacity<2048> > shared_queue; 
} 

void producer_thread() { 
    using namespace boost; 
    namespace stats = queueing::statistics; 
    // generate random data packets or start/stop filter events 

    using queueing::spsc_queue_item; 

    mt19937 engine; 
    auto gen_srce = bind(uniform_int<uint8_t>(0, NUM_SOURCES-1), ref(engine)); 
    auto gen_data = [&] { 
     std::vector<char> v; 
     std::generate_n(back_inserter(v), engine()%1024, bind(uniform_int<uint8_t>{}, ref(engine))); 
     return v; 
    }; 
    enum kind_t { knd_data, knd_start, knd_stop }; 
    auto gen_kind = bind(uniform_int<uint8_t>(knd_data, knd_stop), ref(engine)); 

    queueing::data_t const empty {}; 

    // 
    struct event_t { kind_t kind; spsc_queue_item item; }; 
    auto gen_event = [&] { 
     auto kind = static_cast<kind_t>(gen_kind()); 
     return event_t { 
      kind, 
      spsc_queue_item { 
       gen_srce(), 
       kind == knd_data? gen_data() : empty 
      } 
     }; 
    }; 

    // now that we can easily generate queue items, let's do so in a loop 
    while (!queueing::shutdown_flag) { 
     auto evt = gen_event(); 

     std::this_thread::sleep_for(std::chrono::nanoseconds(engine()%102400)); 

     switch(evt.kind) { 
      case knd_data: 
       stats::events::occurred++; 

       if (queueing::shared_queue.push(evt.item)) { 
        stats::packets::queued++; 
       } else { 
        stats::packets::dropped++; 
       } 
       break; 
      case knd_start: 
       { 
        bool expected = false; 
        if (queueing::source_enabled[evt.item.ID].compare_exchange_weak(expected, true)) 
         std::cout << "+";// << static_cast<int>(evt.item.ID); 
       } 
       break; 
      case knd_stop: 
       { 
        bool expected = true; 
        if (queueing::source_enabled[evt.item.ID].compare_exchange_weak(expected, false)) 
         std::cout << "-";// << static_cast<int>(evt.item.ID); 
       } 
       break; 
     } 
    } 
} 

void fake_process(queueing::spsc_queue_item const& item) { 
    // pretend it takes time proportional to the amount of data 
    std::this_thread::sleep_for(std::chrono::microseconds(item.data.size())); 

    queueing::statistics::packets::processed++; 
} 

void consumer_thread() { 
    namespace stats = queueing::statistics; 

    queueing::spsc_queue_item item; 

    auto consume_pending = [&] { 
     while (queueing::shared_queue.pop(item)) { 
      if (queueing::source_enabled[item.ID]) 
       fake_process(item); // if filtering started, process 
      else 
       stats::packets::skipped[item.ID]++; // if filtering stopped, skip 
     } 
    }; 

    while (!queueing::shutdown_flag) { 
     consume_pending(); 
     stats::idle_cycles++; 
    } 

    consume_pending(); // drain any remaining queued items, to avoid race with shutdown_flag 
} 

#include <cassert> 

int main() { 
    using namespace std; 

    // check no source_enabled flags are set at start 
    assert(0 == count(begin(queueing::source_enabled), end(queueing::source_enabled), true)); 

    auto producer = thread(producer_thread); 
    auto consumer = thread(consumer_thread); 

    this_thread::sleep_for(chrono::seconds(1)); 
    queueing::shutdown_flag = true; 

    if (producer.joinable()) producer.join(); 
    if (consumer.joinable()) consumer.join(); 

    queueing::statistics::report(); 
} 
+0

內置的註釋 – Portisch

+0

更新了答案。現在有一個功能[現場演示](http://coliru.stacked-crooked.com/a/1b0f44cbc5e932cd) – sehe

+0

我記錄的編碼,測試和重構實時流在這裏:https://www.livecoding。電視/視頻/ lockfree隊列過濾與 - 32個通道/ – sehe