我正在研究一個簡單的TCP服務器,它讀取並將消息寫入線程安全隊列。然後應用程序可以使用這些隊列安全地讀取和寫入套接字,即使是從不同的線程。加速asio異步讀取和使用隊列寫入套接字
我面臨的問題是我不能async_read
。我的隊列有pop
操作,它返回下一個要處理的元素,但是如果沒有可用的元素則會阻止它。所以一旦我呼叫POP async_read
回調當然不會被解僱了。有沒有一種方法可以將這樣的隊列整合到boost asio中,還是必須完全重寫?
下面是我用來展示我遇到的問題的一個簡短示例。一旦建立了TCP連接,我將創建一個新的線程,該線程將在該tcp_connection下運行該應用程序。之後我想開始async_read
和async_write
。我一直在這裏打破了我的頭幾個小時,我真的不知道如何解決這個問題。
class tcp_connection : public std::enable_shared_from_this<tcp_connection>
{
public:
static std::shared_ptr<tcp_connection> create(boost::asio::io_service &io_service) {
return std::shared_ptr<tcp_connection>(new tcp_connection(io_service));
}
boost::asio::ip::tcp::socket& get_socket()
{
return this->socket;
}
void app_start()
{
while(1)
{
// Pop is a blocking call.
auto inbound_message = this->inbound_messages.pop();
std::cout << "Got message in app thread: " << inbound_message << ". Sending it back to client." << std::endl;
this->outbound_messages.push(inbound_message);
}
}
void start() {
this->app_thread = std::thread(&tcp_connection::app_start, shared_from_this());
boost::asio::async_read_until(this->socket, this->input_stream, "\r\n",
strand.wrap(boost::bind(&tcp_connection::handle_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
// Start async writing here. The message to send are in the outbound_message queue. But a Pop operation blocks
// empty() is also available to check whether the queue is empty.
// So how can I async write without blocking the read.
// block...
auto message = this->outbound_messages.pop();
boost::asio::async_write(this->socket, boost::asio::buffer(message),
strand.wrap(boost::bind(&tcp_connection::handle_write, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
}
void handle_read(const boost::system::error_code& e, size_t bytes_read)
{
std::cout << "handle_read called" << std::endl;
if (e)
{
std::cout << "Error handle_read: " << e.message() << std::endl;
return;
}
if (bytes_read != 0)
{
std::istream istream(&this->input_stream);
std::string message;
message.resize(bytes_read);
istream.read(&message[0], bytes_read);
std::cout << "Got message: " << message << std::endl;
this->inbound_messages.push(message);
}
boost::asio::async_read_until(this->socket, this->input_stream, "\r\n",
strand.wrap(boost::bind(&tcp_connection::handle_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
}
void handle_write(const boost::system::error_code& e, size_t /*bytes_transferred*/)
{
if (e)
{
std::cout << "Error handle_write: " << e.message() << std::endl;
return;
}
// block...
auto message = this->outbound_messages.pop();
boost::asio::async_write(this->socket, boost::asio::buffer(message),
strand.wrap(boost::bind(&tcp_connection::handle_write, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
}
private:
tcp_connection(boost::asio::io_service& io_service) : socket(io_service), strand(io_service)
{
}
boost::asio::ip::tcp::socket socket;
boost::asio::strand strand;
boost::asio::streambuf input_stream;
std::thread app_thread;
concurrent_queue<std::string> inbound_messages;
concurrent_queue<std::string> outbound_messages;
};
class tcp_server
{
public:
tcp_server(boost::asio::io_service& io_service)
: acceptor(io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 9001))
{
start_accept();
}
private:
void start_accept()
{
std::shared_ptr<tcp_connection> new_connection =
tcp_connection::create(acceptor.get_io_service());
acceptor.async_accept(new_connection->get_socket(),
boost::bind(&tlcp_tcp_server::handle_accept, this, new_connection, boost::asio::placeholders::error));
}
void handle_accept(std::shared_ptr<tcp_connection> new_connection,
const boost::system::error_code& error)
{
if (!error)
{
new_connection->start();
}
start_accept();
}
boost::asio::ip::tcp::acceptor acceptor;
};
謝謝!我沒有想到我可以自己創建一個像這樣的處理程序。就像我使用'strand.wrap'這樣的lang應該可以正常工作。我有一個問題,但爲什麼我需要錯誤消息佔位符? –
@JohnSmith我的錯誤。你需要一個消息佔位符(std :: string&?),你希望將來自上一次不成功的io讀操作的任何錯誤代碼傳遞迴異步處理程序,以防消息隊列耗盡並且讀取錯誤。 –
是的,我現在已經實施了。在代碼中,它不是一個字符串,但用示例字符串演示是最簡單的。 Tommorow我會在另一個答案中發佈代碼,但我會保持你的接受。只爲那些想知道同樣事情的人。 –