2015-04-01 44 views
1

我試圖實現由我的C++代碼控制的硬件設備的同步操作。確保boost :: deadline_timer不會接受新的等待,除非先前的等待已過期

假設有兩種類型的設備在那裏我可以執行Open/Close。 我需要實現的是爲Specified Duration打開一種類型的設備。第二種設備也是如此。

我已經寫代碼boost::deadline_timer

#include <boost/date_time/posix_time/posix_time.hpp> 
#include <boost/thread.hpp> 
#include <boost/asio.hpp> 


class Test : public std::enable_shared_from_this <Test> 
{ 
public: 
    Test() :io_(), timerOne_(io_),timerTwo_(io_){} 
    void Open(int num); 
    void Close(int num); 
    void TimedOpen(int num, int dur); 
    void Run(); 
private: 
    boost::asio::io_service io_; 
    boost::asio::deadline_timer timerOne_; 
    boost::asio::deadline_timer timerTwo_; 
}; 

void Test::Open(int type) 
{ 
    std::cout << "Open for Number : " << type << std::endl; 
} 

void Test::Close(int type) 
{ 
    std::cout << "Close for Number : " << type << std::endl; 
} 

void Test::TimedOpen(int type, int dur) 
{ 
    switch (type) 
    { 
    case 1: 
    { 
       io_.reset(); 
       auto fn = std::bind(&Test::Open, shared_from_this(), std::placeholders::_1); 
       fn(type); 
       timerOne_.expires_from_now(boost::posix_time::seconds(dur)); 
       timerOne_.async_wait(std::bind(&Test::Close, shared_from_this(), type)); 
       Run(); 
       std::cout << "Function Exiting" << std::endl; 
       std::cout << "-----------------------------------------------" << std::endl; 
       return; 
    } 

    case 2: 
    { 
       io_.reset(); 
       auto fn = std::bind(&Test::Open, shared_from_this(), std::placeholders::_1); 
       fn(type); 
       timerTwo_.expires_from_now(boost::posix_time::seconds(dur)); 
       timerTwo_.async_wait(std::bind(&Test::Close, shared_from_this(), type)); 
       Run(); 
       std::cout << "Function Exiting" << std::endl; 
       std::cout << "-----------------------------------------------" << std::endl; 
       return; 
    } 

    } 

} 

void Test::Run() 
{ 
    boost::thread th(boost::bind(&boost::asio::io_service::run, &io_)); 
} 

int main() 
{ 
    auto t = std::make_shared<Test>(); 
    t->TimedOpen(1, 60); 
    t->TimedOpen(2, 30); 
    t->TimedOpen(1, 5); 
    t->TimedOpen(2, 2); 
    char line[128]; 
    while (std::cin.getline(line, 128)) 
    { 
     if (strcmp(line, "\n")) break; 
    } 
    return 0; 
} 

輸出爲:

Open for Number : 1 
Function Exiting 
----------------------------------------------- 
Open for Number : 2 
Function Exiting 
----------------------------------------------- 
Open for Number : 1 
Close for Number : 1 
Function Exiting 
----------------------------------------------- 
Open for Number : 2 
Close for Number : 2 
Function Exiting 
----------------------------------------------- 
Close for Number : 2 
Close for Number : 1 

對於timerOne_t->TimedOpen(1, 5)執行之前的動作它不會等待以前wait到即作爲即將到期t->TimedOpen(1, 60)被取消。

因此Close for Number : 1出現在輸出中,沒有等待t->TimedOpen(1, 60)

我想實現的是,如果multiple waits are encountered任何類型的timer,所有的操作應即如果鍵入排隊

t->TimedOpen(1, 60); 
t->TimedOpen(1, 10); 
t->TimedOpen(1, 5); 

應該爲60+10+5秒做TimedOpen Operation。目前它只有5秒。也應該是非阻塞的,即我不能使用wait() instead of async_wait()

我該如何實現它?

摘要: 我的要求是安排在boost::deadline_timer()即多個操作操作上,除非前面的等待已過期,將被排隊。

+0

IMHO一個解決方案是在'TimedOpen'方法積聚定時器持續時間('dur'),存儲所述第一'的時間TimedOpen'調用和使用'basic_deadline_timer :: expires_at'方法總結的初始時間和時間總結,而不是'basic_deadline_timer :: expires_from_now' – megabyte1024 2015-04-01 07:16:11

+0

@ megabyte1024積累不會給我..調度功能..假設我想確保順序操作..它不能確保 – 2015-04-01 07:45:15

+1

不是清除..我是否正確理解3個調用dead_line定時器處理程序的例子需要3個'TimeOpen'調用?如果是的話,那麼你需要建立你自己的操作隊列等等。'expires_from_now'取消所有待處理的異步等待。請參閱[幫助](http://www.boost.org/doc/libs/1_57_0/doc/html/boost_asio/reference/basic_deadline_timer.html)。 – megabyte1024 2015-04-01 08:31:28

回答

1

像在評論中提到的,你會希望每個「類型」有隊列。

讓我們將每個類型的隊列命名爲「會話」。

通過將單個隊列中的所有異步等待鏈接到一個strand¹上,您可以獲得有效的序列化(也可以避免隊列/會話同步)。

唯一棘手的是當沒有任何飛行中開始異步等待。不變的是,異步操作是在飛行中當且僅當!queue_.empty()

struct Session : std::enable_shared_from_this<Session> { 
    Session(boost::asio::io_service &io, int type) : strand_(io), timer_(io), type(type) {} 

    void Enqueue(int duration) { 
     auto This = shared_from_this(); 
     strand_.post([This,duration,this] { 
       std::cout << "t0 + " << std::setw(4) << mark() << "ms Enqueue for Number: " << type << " (dur:" << duration  << ")\n"; 
       This->queue_.push(duration); 
       if (This->queue_.size() == 1) 
        This->Wait(); 
      }); 
    } 

    private: 
    boost::asio::strand strand_; 
    boost::asio::deadline_timer timer_; 
    int type; 
    std::queue<int> queue_; 

    void Close() { 
     assert(!queue_.empty()); 
     std::cout << "t0 + " << std::setw(4) << mark() << "ms Close for Number : " << type << " (dur:" << queue_.front() << ") (depth " << queue_.size() << ")\n"; 

     queue_.pop(); 
     Wait(); 
    } 
    void Wait() { 
     if (!queue_.empty()) { 
      std::cout << "t0 + " << std::setw(4) << mark() << "ms Open for Number : " << type << " (dur:" << queue_.front() << ") (depth " << queue_.size() << ")\n"; 
      timer_.expires_from_now(boost::posix_time::milliseconds(queue_.front())); 
      timer_.async_wait(strand_.wrap(std::bind(&Session::Close, shared_from_this()))); 
     } 
    } 
}; 

現在Test類變得簡單多了(其實它並不需要在所有爲「共享」,但我已經離開了詳細的衆所周知的練習留給讀者):

class Test : public std::enable_shared_from_this<Test> { 
    using guard = boost::lock_guard<boost::mutex>; 
public: 
    Test() : io_(), work_(boost::asio::io_service::work(io_)) { 
     io_thread = boost::thread([this] { io_.run(); }); 
    } 

    void TimedOpen(int num, int duration); 

    void Stop() { 
     { 
      guard lk(mx_); 
      if (work_) work_.reset(); 
     } 
     io_thread.join(); 
    } 

    ~Test() { 
     Stop(); 

     guard lk(mx_); 
     timers_ex_.clear(); 
    } 

private: 
    mutable boost::mutex mx_; 
    boost::asio::io_service io_; 
    boost::optional<boost::asio::io_service::work> work_; 
    std::map<int, std::shared_ptr<Session> > timers_ex_; 
    boost::thread io_thread; 
}; 

void Test::TimedOpen(int type, int duration) { 
    guard lk(mx_); 

    auto &session = timers_ex_[type]; 
    if (!session) session = std::make_shared<Session>(io_, type); 

    session->Enqueue(duration); 
} 

正如你可以看到我已經

  • 推廣到任意數量的類型
  • 點進行的操作線程安全
  • 加入相對時間戳以毫秒爲單位,因爲t0
  • 固定在完全斷裂io_service壽命。現在,施工啓動服務。空閒時,work_變量保持活動狀態。
  • Stop()關閉它(首先排空會話隊列)。
  • 銷燬調用Stop()隱含

這是一個測試運行:

Live On Coliru

int main() { 
    auto t = std::make_shared<Test>(); 
    t->TimedOpen(1, 300); 
    t->TimedOpen(2, 150); 
    t->TimedOpen(1, 50); 
    t->TimedOpen(2, 20); 

    boost::this_thread::sleep_for(boost::chrono::milliseconds(400)); 
    std::cout << "================\n"; 
    t->TimedOpen(1, 50); 
    t->TimedOpen(2, 20); 
    t->TimedOpen(1, 300); 
    t->TimedOpen(2, 150); 

    t->Stop(); 
} 

打印

t0 + 0ms Enqueue for Number: 1 (dur:300) 
t0 + 0ms Open for Number : 1 (dur:300) (depth 1) 
t0 + 0ms Enqueue for Number: 2 (dur:150) 
t0 + 0ms Open for Number : 2 (dur:150) (depth 1) 
t0 + 0ms Enqueue for Number: 1 (dur:50) 
t0 + 0ms Enqueue for Number: 2 (dur:20) 
t0 + 150ms Close for Number : 2 (dur:150) (depth 2) 
t0 + 150ms Open for Number : 2 (dur:20) (depth 1) 
t0 + 170ms Close for Number : 2 (dur:20) (depth 1) 
t0 + 300ms Close for Number : 1 (dur:300) (depth 2) 
t0 + 300ms Open for Number : 1 (dur:50) (depth 1) 
t0 + 350ms Close for Number : 1 (dur:50) (depth 1) 
================ 
t0 + 400ms Enqueue for Number: 1 (dur:50) 
t0 + 400ms Open for Number : 1 (dur:50) (depth 1) 
t0 + 400ms Enqueue for Number: 2 (dur:20) 
t0 + 400ms Open for Number : 2 (dur:20) (depth 1) 
t0 + 400ms Enqueue for Number: 1 (dur:300) 
t0 + 400ms Enqueue for Number: 2 (dur:150) 
t0 + 420ms Close for Number : 2 (dur:20) (depth 2) 
t0 + 420ms Open for Number : 2 (dur:150) (depth 1) 
t0 + 450ms Close for Number : 1 (dur:50) (depth 2) 
t0 + 450ms Open for Number : 1 (dur:300) (depth 1) 
t0 + 570ms Close for Number : 2 (dur:150) (depth 1) 
t0 + 750ms Close for Number : 1 (dur:300) (depth 1) 

¹Why do I need strand per connection when using boost::asio?