2012-07-26 77 views
9

我使用boost::asio::io_service作爲基本線程池。一些線程被添加到io_service中,主線程開始發佈處理程序,工作線程開始運行處理程序,並且所有事情都完成了。到現在爲止還挺好;我通過單線程代碼獲得了很好的加速效果。使用Boost Asio設置郵政隊列大小限制?

但是,主線程有數以百萬計的事情發佈。它只是繼續發佈它們,比工作線程處理它們要快得多。我沒有達到內存限制,但將這麼多東西排入隊伍仍然很愚蠢。我想要做的是爲處理程序隊列設置固定大小,如果隊列已滿,則使用post()塊。

我在Boost ASIO文檔中看不到任何選項。這可能嗎?

回答

0

你可以使用strand對象來放置事件並延遲你的main?所有工作發佈後,您的計劃是否會退出?如果是這樣,你可以使用工作對象,這將讓你更好地控制你的io_service何時停止。

你總是可以主要檢查線程的狀態,並讓它等待,直到一個變得自由或類似的東西。

//鏈接

http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/reference/io_service__strand.html

http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/reference/io_service.html

//example from the second link 
boost::asio::io_service io_service; 
boost::asio::io_service::work work(io_service); 

希望這會有所幫助。

+0

問題不在於'io_service'在完成工作之前停止---我們知道刪除'work'對象以使'io_service'正常停止。問題在於'io_service'允許累積太多的任務。我們希望以一種不涉及創建任務的線程輪詢的方式來限制未分配的任務的數量,因此我們關於是否可以阻止「poll()」的問題。 – uckelman 2012-07-28 21:34:17

2

我正在使用信號量修復處理程序隊列大小。下面的代碼說明了此解決方案:

void Schedule(boost::function<void()> function) 
{ 
    semaphore.wait(); 
    io_service.post(boost::bind(&TaskWrapper, function)); 
} 

void TaskWrapper(boost::function<void()> &function) 
{ 
    function(); 
    semaphore.post(); 
} 
1

你可以用你的拉姆達在另一個lambda這將需要數着「正在進行」任務的照顧,然後如果有太多正在進行的任務發佈前等待。

例子:

#include <atomic> 
#include <chrono> 
#include <future> 
#include <iostream> 
#include <mutex> 
#include <thread> 
#include <vector> 
#include <boost/asio.hpp> 

class ThreadPool { 
    using asio_worker = std::unique_ptr<boost::asio::io_service::work>; 
    boost::asio::io_service service; 
    asio_worker service_worker; 
    std::vector<std::thread> grp; 
    std::atomic<int> inProgress = 0; 
    std::mutex mtx; 
    std::condition_variable busy; 
public: 
    ThreadPool(int threads) : service(), service_worker(new asio_worker::element_type(service)) { 
    for (int i = 0; i < threads; ++i) { 
     grp.emplace_back([this] { service.run(); }); 
    } 
    } 

    template<typename F> 
    void enqueue(F && f) { 
    std::unique_lock<std::mutex> lock(mtx); 
    // limit queue depth = number of threads 
    while (inProgress >= grp.size()) { 
     busy.wait(lock); 
    } 
    inProgress++; 
    service.post([this, f = std::forward<F>(f)]{ 
     try { 
     f(); 
     } 
     catch (...) { 
     inProgress--; 
     busy.notify_one(); 
     throw; 
     } 
     inProgress--; 
     busy.notify_one(); 
    }); 
    } 

    ~ThreadPool() { 
    service_worker.reset(); 
    for (auto& t : grp) 
     if (t.joinable()) 
     t.join(); 
    service.stop(); 
    } 
}; 

int main() { 
    std::unique_ptr<ThreadPool> pool(new ThreadPool(4)); 
    for (int i = 1; i <= 20; ++i) { 
    pool->enqueue([i] { 
     std::string s("Hello from task "); 
     s += std::to_string(i) + "\n"; 
     std::cout << s; 
     std::this_thread::sleep_for(std::chrono::seconds(1)); 
    }); 
    } 
    std::cout << "All tasks queued.\n"; 
    pool.reset(); // wait for all tasks to complete 
    std::cout << "Done.\n"; 
} 

輸出:

Hello from task 3 
Hello from task 4 
Hello from task 2 
Hello from task 1 
Hello from task 5 
Hello from task 7 
Hello from task 6 
Hello from task 8 
Hello from task 9 
Hello from task 10 
Hello from task 11 
Hello from task 12 
Hello from task 13 
Hello from task 14 
Hello from task 15 
Hello from task 16 
Hello from task 17 
Hello from task 18 
All tasks queued. 
Hello from task 19 
Hello from task 20 
Done. 
0

也許嘗試降低主線程的優先級,以便一旦工作線程忙碌起來,他們捱餓主線程和系統的自我限制。