2014-12-24 43 views
0

我目前正在使用一個小型servlet通過TCP發送模擬數據,使用boost :: asio作爲網絡部分。我設法在我的機器上的兩個進程之間獲得通信(簡單的客戶端是用Python編寫的)。問題在於相同的數據不斷通過套接字發送,而不是被更新。Boost :: ASIO多線程寫入過時數據到套接字?

我使用兩個線程:一個運行模擬,創建數據,並使用當前數據更新服務器的連接對象。第二個運行服務器,每隔一段時間都會將當前數據寫入套接字。我已經創建了一個最小的例子在這裏與你分享(它使用MSVC++ 12.0進行編譯,並且如果你喜歡複製,就有我在說的問題)。

tcp_server * server; 
bool connected = false; 

void runServer() { 
    try 
    { 
     boost::asio::io_service io_service; 
     server = new tcp_server(io_service); 

     connected = true; 
     io_service.run(); 
    } 
    catch (std::exception& e) 
    { 
     std::cerr << e.what() << std::endl; 
    } 
} 

void runSim() { 
    for (int i = 0; i < 1000; i++) { 
     if (connected) 
      server->setData("Current Message: " + std::to_string(i)); 

     boost::this_thread::sleep(boost::posix_time::seconds(1)); 
    } 
} 

int _tmain(int argc, _TCHAR* argv[]) 
{ 
    boost::thread serverThread(runServer); 
    boost::thread simThread(runSim); 

    simThread.join(); 
    serverThread.join(); 

    return 0; 
} 

這裏有兩個類,TCP_Connection和TCP_Server。這些非常接近地複製boost網站上的boost :: asio教程中找到的那些。

class tcp_connection 
    : public boost::enable_shared_from_this<tcp_connection> 
{ 
public: 
    typedef boost::shared_ptr<tcp_connection> pointer; 

    static pointer create(boost::asio::io_service& io_service) 
    { 
     return pointer(new tcp_connection(io_service)); 
    } 

    tcp::socket& socket() 
    { 
     return socket_; 
    } 

    void start() 
    { 
     message_ = make_daytime_string(); 

     boost::asio::async_write(socket_, boost::asio::buffer(message_), 
      boost::bind(&tcp_connection::handle_write, shared_from_this())); 
    } 

    void setData(std::string msg) { 
     boost::unique_lock<boost::shared_mutex> msgLock(msgMutex, boost::try_to_lock); 
     if (msgLock.owns_lock()) { 
      message_ = msg; 
     } 
    } 

private: 
    tcp_connection(boost::asio::io_service& io_service) 
     : socket_(io_service) 
    { 
     timer_ = new boost::asio::deadline_timer(io_service,boost::posix_time::milliseconds(250)); 
    } 

    void handle_write() 
    { 
     boost::shared_lock<boost::shared_mutex> msgLock(msgMutex); 
     std::cout << "Writing to socket: " << message_ << std::endl; 
     boost::asio::write(socket_, boost::asio::buffer(message_));  

     timer_->expires_at(timer_->expires_at() + boost::posix_time::milliseconds(1500)); 
     timer_->async_wait(boost::bind(&tcp_connection::handle_write, shared_from_this())); 
    } 

    tcp::socket socket_; 
    std::string message_; 
    int counter_; 
    boost::asio::deadline_timer * timer_; 
    boost::shared_mutex msgMutex; 
}; 

class tcp_server 
{ 
public: 
    tcp_server(boost::asio::io_service& io_service) 
     : acceptor_(io_service, tcp::endpoint(tcp::v4(), 13)) 
    { 
     start_accept(); 
    } 

    void setData(std::string msg) { 
     if (current_connection != NULL) { 
      current_connection->setData(msg); 
     } 
    } 

private: 
    void start_accept() 
    { 
     tcp_connection::pointer new_connection = 
      tcp_connection::create(acceptor_.get_io_service()); 

     acceptor_.async_accept(new_connection->socket(), 
      boost::bind(&tcp_server::handle_accept, this, new_connection, 
      boost::asio::placeholders::error)); 

     current_connection = new_connection; 
    } 

    void handle_accept(tcp_connection::pointer new_connection, 
     const boost::system::error_code& error) 
    { 
     if (!error) 
     { 
      new_connection->start(); 
      std::cout << "New Connection on 127.0.0.1" << std::endl; 
     } 

     start_accept(); 
    } 

    tcp::acceptor acceptor_; 
    tcp_connection::pointer current_connection; 
}; 

通過明智地使用std ::法院的,我已經成功地確定服務器線程從模擬線程獲取數據,並連接對象被傳遞一樣好(因爲使用setData( )方法正在被調用,當它應該)。無論出於何種原因,它看起來像連接的成員'message_'沒有被更新。我也知道連接沒有從控制檯的「新建連接」更新重置或重新創建。

+1

您似乎正在使用異步'boost :: asio :: async_write()'操作混合同步'boost :: asio :: write()'操作。同樣,爲什麼'async_wait()'在socket上寫入另一個數據流的完成處理程序? –

+0

嗯,我這樣做是因爲這是我可以找出如何在傳輸之間設置一段時間(async_wait位於具有指定週期的截止時間定時器)的唯一方法。我的想法是寫入重複發生,數據由另一個線程更新,這就是完成處理程序開始另一個寫入和等待的原因。 – gankoji

+0

我應該補充說我只添加了代碼來使用阻塞boost :: asio :: write()。 async_write()來自我用作此應用程序基礎的教程,如果需要或僅使用一個或另一個的理由,可以很容易地將其更改爲阻止寫入。 – gankoji

回答

1

好的,薩姆米勒在這裏得到了答案的功勞,但他把它作爲評論發佈,所以我現在正在回答關閉這個問題,我已經明白了。最終,該錯誤很可能是交錯寫入調用和訪問對象數據的問題。我重寫了我的示例代碼,僅使用Sam提供的其他答案中已經鏈接的準則,僅包含一個類(而不是上面的兩個)。我也讓所有的寫操作都是異步的。現在的代碼如下:

#include <iostream> 
#include <string> 
#include <boost/bind.hpp> 
#include <boost/asio.hpp> 
#include <boost/thread.hpp> 
#include <deque> 

using boost::asio::ip::tcp; 
using namespace std; 

class tcp_server { 
public: 
    tcp_server(boost::asio::io_service& io_service) 
     : _acceptor(io_service, tcp::endpoint(tcp::v4(), 5005)), _socket(io_service) 
    { 
     messages = std::deque<std::string> (1,"Hello from Jake's shitty server"); 

     timer_ = new boost::asio::deadline_timer(io_service, boost::posix_time::milliseconds(250)); 

     start_accept(); 
    } 

    void write(std::string message) { 
     boost::unique_lock<boost::shared_mutex> queueLock(queueMutex); 
     messages.push_back(message); 
     if (messages.size() <= 1) 
      handle_write(); 
    } 
private: 
    void start_accept() { 
     _acceptor.async_accept(_socket, 
      boost::bind(&tcp_server::handle_accept, this, 
      boost::asio::placeholders::error)); 
    } 

    void handle_accept(boost::system::error_code e) { 
     if (!messages.empty()) { 

      _message = messages.front(); 
      messages.pop_front(); 

      boost::asio::async_write(_socket, boost::asio::buffer(_message), 
       boost::bind(&tcp_server::handle_write, this)); 

     } 
    } 

    void handle_write() { 

     if (!messages.empty()) { 
      timer_->expires_at(timer_->expires_at() + boost::posix_time::milliseconds(1500)); 
      timer_->async_wait(boost::bind(&tcp_server::handle_accept, this, boost::asio::placeholders::error)); 
     } 

     return; 
    } 

    std::string _message; 
    std::deque<std::string> messages; 

    tcp::acceptor _acceptor; 
    tcp::socket _socket; 
    boost::asio::deadline_timer * timer_; 
    boost::shared_mutex queueMutex; 


}; 


tcp_server * server; 

void addMessages() { 
    for (int i = 0; i < 10; i++) { 
     server->write("New Message. Count: " + std::to_string(i) + ".\n"); 
    } 
} 


int _tmain(int argc, _TCHAR* argv[]) 
{ 
    boost::asio::io_service io_service; 
    server = new tcp_server(io_service); 

    server->write("Hey there sexy"); 
    boost::thread messenger(addMessages); 

    io_service.run(); 

    return 0; 
} 

TL; DR使用消息隊列,並且不要混合使用異步/同步寫入。

另外,我在處理這個時遇到了一個有趣的問題,那就是我使用從消息隊列中彈出的臨時字符串填充boost :: asio :: buffer。這使VS 2013的調試斷言失敗,說一個字符串迭代器是不可忽略的。一旦我將_message屬性添加到類中,並用它來構建緩衝區,一切都運行良好。在這裏發現小費:Expression: string iterator not dereferencable while using boost regex。感謝您的幫助山姆!

+1

很高興你知道了! –