2012-12-09 89 views
4

首先我問這個Running a function on the main thread from a boost thread and passing parameters to that function使用boost ::支持ASIO :: io_service對象::後()

所以現在我想這樣的:

下面是一個控制檯C++項目中,我完全模擬我的大項目

TestServicePost.cpp

#include "stdafx.h" 
#include "SomeClass.h" 


int _tmain(int argc, _TCHAR* argv[]) 
{ 
    SomeClass* s = new SomeClass(); 
    while(true) 
    { 
     s->update(); 
    } 
    return 0; 
} 

SomeClass.h

#include <boost/thread.hpp> 
#include <boost/asio.hpp> 
#include <queue> 

class ServiceNote 
{ 
public: 
    std::string getType() 
    { 
     std::stringstream typeSS; 
     typeSS << "LamasaTech.MultiWall.PostNote." << (NoteType.compare("Normal") == 0 ? "Node" : "Header") << "." << Shape << "." << Colour; 
     return typeSS.str(); 
    } 
    int Action; 
    int CNoteId;  
    std::string Colour; 
    int NoteId; 
    std::string NoteType; 
    int SessionId; 
    std::string Shape; 
    std::string Style; 
    std::string Text; 
    int X; 
    int Y; 
}; 

class SomeClass 
{ 
public: 
    SomeClass(); 
    ~SomeClass(); 
    void update(); 

private: 
    std::queue<ServiceNote> pendingNotes; 
    void addToQueue(ServiceNote sn); 
    void pollService(boost::asio::io_service* svc); 
    int getMessage(boost::asio::io_service* svc, std::string sessionId, int messageId); 
    boost::thread servicePoller; 
}; 

SomeClass.cpp

#include "stdafx.h" 
#include "SomeClass.h" 
#include <boost/property_tree/ptree.hpp> 
#include <boost/property_tree/json_parser.hpp> 
#include <boost/asio/signal_set.hpp> 

#define POLL_SERVICE = 0; 
#define POLLING_WAIT_TIME 1000 
#define SAVE_SESSION_EVERY 1800000 

SomeClass::SomeClass() 
{ 
    boost::asio::io_service io_servicePoller; 
    io_servicePoller.run(); 
    servicePoller = boost::thread(boost::bind(&SomeClass::pollService, this, &io_servicePoller)); 
    /*boost::asio::io_service io_sessionSaver; 
    boost::asio::signal_set signalsSaver(io_sessionSaver, SIGINT, SIGTERM); 
    signalsSaver.async_wait(boost::bind(&boost::asio::io_service::stop, &io_sessionSaver)); 
    sessionSaver = boost::thread(&SomeClass::saveSessionEvery, io_sessionSaver);*/ 
} 

SomeClass::~SomeClass() 
{ 
} 

void SomeClass::update() 
{ 
    while(!pendingNotes.empty()) 
    { 
     ServiceNote sn = pendingNotes.front(); 

     pendingNotes.pop(); 
    } 
} 

void SomeClass::addToQueue(ServiceNote sn) 
{ 
    pendingNotes.push(sn); 
} 

void SomeClass::pollService(boost::asio::io_service* svc) 
{ 
    int messageId = 1; 
    while(true) 
    { 
     if(boost::this_thread::interruption_enabled() && boost::this_thread::interruption_requested()) 
      return; 
     int currentId = messageId; 
     messageId = getMessage(svc, "49", messageId); 
     if(currentId == messageId) 
      boost::this_thread::sleep(boost::posix_time::milliseconds(POLLING_WAIT_TIME)); 
    } 
} 

int SomeClass::getMessage(boost::asio::io_service* svc, std::string sessionId, int messageId) 
{ 
    try 
    { 
     boost::asio::io_service io_service; 

     // Get a list of endpoints corresponding to the server name. 
     boost::asio::ip::tcp::resolver resolver(io_service); 
     boost::asio::ip::tcp::resolver::query query("mw.rombus.com", "http"); 
     boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query); 

     // Try each endpoint until we successfully establish a connection. 
     boost::asio::ip::tcp::socket socket(io_service); 
     boost::asio::connect(socket, endpoint_iterator); 

     // Form the request. We specify the "Connection: close" header so that the 
     // server will close the socket after transmitting the response. This will 
     // allow us to treat all data up until the EOF as the content. 
     boost::asio::streambuf request; 
     std::ostream request_stream(&request); 
     request_stream << "GET " "/Service.svc/message/" << sessionId << "/" << messageId << " HTTP/1.0\r\n"; 
     request_stream << "Host: " << "mw.rombus.com" << "\r\n"; 
     request_stream << "Accept: */*\r\n"; 
     request_stream << "Connection: close\r\n\r\n"; 

     // Send the request. 
     boost::asio::write(socket, request); 

     // Read the response status line. The response streambuf will automatically 
     // grow to accommodate the entire line. The growth may be limited by passing 
     // a maximum size to the streambuf constructor. 
     boost::asio::streambuf response; 
     boost::asio::read_until(socket, response, "\r\n"); 

     // Check that response is OK. 
     std::istream response_stream(&response); 
     std::string http_version; 
     response_stream >> http_version; 
     unsigned int status_code; 
     response_stream >> status_code; 
     std::string status_message; 
     std::getline(response_stream, status_message); 
     if (!response_stream || http_version.substr(0, 5) != "HTTP/") 
     { 
      //std::cout << "Invalid response\n"; 
      return messageId; 
     } 
     if (status_code != 200) 
     { 
      //std::cout << "Response returned with status code " << status_code << "\n"; 
      return messageId; 
     } 

     // Read the response headers, which are terminated by a blank line. 
     boost::asio::read_until(socket, response, "\r\n\r\n"); 

     // Process the response headers. 
     std::string header; 
     std::string fullHeader = ""; 
     while (std::getline(response_stream, header) && header != "\r") 
      fullHeader.append(header).append("\n"); 

     // Write whatever content we already have to output. 
     std::string fullResponse = ""; 
     if (response.size() > 0) 
     { 
      std::stringstream ss; 
      ss << &response; 
      fullResponse = ss.str(); 
      try 
      { 
       boost::property_tree::ptree pt; 
       boost::property_tree::read_json(ss, pt); 
       ServiceNote sn; 
       sn.Action = pt.get<int>("Action"); 
       sn.CNoteId = pt.get<int>("CNoteId"); 
       sn.Colour = pt.get<std::string>("Colour"); 
       sn.NoteId = pt.get<int>("NoteId"); 
       sn.NoteType = pt.get<std::string>("NoteType"); 
       sn.SessionId = pt.get<int>("SessionId"); 
       sn.Shape = pt.get<std::string>("Shape"); 
       sn.Style = pt.get<std::string>("Style"); 
       sn.Text = pt.get<std::string>("Text"); 
       sn.X = pt.get<int>("X"); 
       sn.Y = pt.get<int>("Y"); 
       svc->post(boost::bind(&SomeClass::addToQueue, this, sn)); 
       //pendingNotes.push(sn); 
      } 
      catch (std::exception const& e) 
      { 
       std::string test = e.what(); 
       //std::cerr << e.what() << std::endl; 
      } 
      messageId++; 
     } 

     // Read until EOF, writing data to output as we go. 
     std::string fullSth = ""; 
     boost::system::error_code error; 
     while (boost::asio::read(socket, response, 
       boost::asio::transfer_at_least(1), error)) 
     { 
      std::ostringstream ss; 
      ss << &response; 
      fullSth = ss.str(); 
     } 
     if (error != boost::asio::error::eof) 
      throw boost::system::system_error(error); 
    } 
    catch (std::exception& e) 
    { 
     std::string test = e.what(); 
     std::cout << "Exception: " << e.what() << "\n"; 
    } 
    return messageId; 
} 

,但我得到Unhandled exception at 0x771215de in TestServicePost.exe: 0xC0000005: Access violation writing location 0xcccccce4.,之後該行執行:

svc->post(boost::bind(&SomeClass::addToQueue, this, sn)); 

我不能確定io_service對象作爲一個類的成員,所以我可以在析構函數中使用它~SomeClass(),也將不勝感激幫助

如果io_service.post不是我的最佳解決方案,請推薦一些東西,你可以看到我有一個構造函數,d構造函數和更新方法誰被調用每個刻度,我試着使用這個和隊列單獨但它不是線程安全的,有沒有一個簡單的線程安全FIFO使用?

回答

2

我想通了如何聲明io_service對象的類成員:

boost::shared_ptr<boost::asio::io_service> io_servicePoller; 

,並在構造函數中,我做了以下內容:

SomeClass::SomeClass() 
{ 
    boost::shared_ptr<boost::asio::io_service> io_service(
     new boost::asio::io_service 
    ); 
    io_servicePoller = io_service; 
    servicePoller = boost::thread(boost::bind(&SomeClass::pollService, this, io_servicePoller)); 
} 

一些清理

SomeClass::~SomeClass() 
{ 
    servicePoller.interrupt(); 
    io_servicePoller->stop(); 
    servicePoller.join(); 
} 

,並在更新我稱之爲運行,它添加到隊列中的東西,然後在while循環中讀取它們

void SomeClass::update() 
{ 
    io_servicePoller->run(); 
    io_servicePoller->reset(); 
    while(!pendingNotes.empty()) 
    { 
     ServiceNote sn = pendingNotes.front(); 

     pendingNotes.pop(); 
    } 
} 

,並改變了我的會員簽名void SomeClass::pollService(boost::shared_ptr<boost::asio::io_service> svc)

那麼什麼情況是:

  1. 應用程序啓動時
  2. inits我的課
  3. 我的類使得服務和啓動線程
  4. 線程從服務提取項目
  5. 主線程檢查IO服務隊列和exuted它
  6. 那麼它使用的隊列

由於伊戈爾·R.我不能做它沒有他

http://www.gamedev.net/blog/950/entry-2249317-a-guide-to-getting-started-with-boostasio?pg=4在那裏我得到如何使共享指針

+0

但是這樣做不太好。如果你堅持以這種方式使用'io_service :: run()'(即僅處理io_service隊列中當前可用的項並退出),則必須先調用'io_servicePoller-> reset()',否則'運行'只會第一次工作。 –

+0

它工作正常,每次 –

+0

但我添加了io_servicePoller-> reset();在io_servicePoller-> run()之後;無論如何,因爲我尊重你,我相信你比我更有經驗 –

4

SomeClass構造你確實做到以下幾點:

  1. 定義本地io_service實例。
  2. 呼叫它的run()成員函數,which returns immediately,因爲io_service沒有工作。
  3. 本地對象的地址傳遞給另一個線程。

這肯定不行。

請注意,io_service::run()是一種「消息循環」,所以應該阻塞調用線程。不要在對象構造函數中調用它。

+0

你告訴我我做錯了什麼,但我怎麼能做到這一點? –

+0

謝謝,我解決了它我現在會發布我的解決方案:) –

+0

@Shereef使另一個成員函數,給''工作','io_service'和調用'run'作爲最後一個語句(它會阻止):void run (){io_service io_servicePoller; io_service :: work work(io_servicePoller); servicePoller = thread(bind(&SomeClass :: pollService,this,&io_servicePoller)); io_servicePoller.run(); }' –

相關問題