我使用的boost :: ASIO和的boost ::線程實現它接受消息的消息服務,把他們異步如果沒有消息正在處理或隊列消息是否有消息正在處理。不正確使用升壓:: ASIO和boost ::線程
消息速率在我看來很高,約爲2.000消息每秒。有這麼多的消息,我面臨着損壞的消息,但很少。在2.000條消息中大約4-8損壞。我相信問題是由於不正確使用boost :: asio和/或boost :: thread庫引起的。
我執行的代碼主要基於this boost tutorial。我找不到一個錯誤,而且自從主要信息出現以後,發現我很難縮小這個問題的範圍。
也許別人有一個想法這裏發生了什麼問題?
基本上這個類是在使用方式如下:
(1)構造函數被調用我的程序的開頭,以啓動線程,因此服務,接受和傳遞消息
(2)每當我想傳輸一條消息時,我都會撥打MessageService::transmitMessage()
,將async_write
的任務委託給處理消息隊列的線程。
using namespace google::protobuf::io;
using boost::asio::ip::tcp;
MessageService::MessageService(std::string ip, std::string port) :
work(io_service), resolver(io_service), socket(io_service) {
messageQueue = new std::deque<AgentMessage>;
tcp::resolver::query query(ip, port);
endpoint_iterator = resolver.resolve(query);
tcp::endpoint endpoint = *endpoint_iterator;
socket.async_connect(endpoint, boost::bind(&MessageService::handle_connect,
this, boost::asio::placeholders::error, ++endpoint_iterator));
boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service));
}
void MessageService::await() {
while (!messageQueue->empty()) {
signal(SIGINT, exit);
int messagesLeft = messageQueue->size();
sleep(3);
std::cout << "Pending Profiler Agents Messages: "
<< messageQueue->size() << std::endl;
if (messagesLeft == messageQueue->size()) {
std::cout << "Connection Error" << std::endl;
break;
}
}
std::cout << i << std::endl;
}
void MessageService::write(AgentMessage agentMessage, long systemTime,
int JVM_ID) {
agentMessage.set_timestamp(Agent::Helper::getCurrentClockCycle());
agentMessage.set_jvm_id(JVM_ID);
agentMessage.set_systemtime(systemTime);
io_service.post(boost::bind(&MessageService::do_write, this, agentMessage));
}
void MessageService::do_close() {
socket.close();
}
void MessageService::transmitMessage(AgentMessage agentMessage) {
++i;
boost::asio::streambuf b;
std::ostream os(&b);
ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os);
CodedOutputStream *coded_output = new CodedOutputStream(raw_output);
coded_output->WriteVarint32(agentMessage.ByteSize());
agentMessage.SerializeToCodedStream(coded_output);
delete coded_output;
delete raw_output;
boost::system::error_code ignored_error;
boost::asio::async_write(socket, b.data(), boost::bind(
&MessageService::handle_write, this,
boost::asio::placeholders::error));
}
void MessageService::do_write(AgentMessage agentMessage) {
bool write_in_progress = !messageQueue->empty();
messageQueue->push_back(agentMessage);
if (!write_in_progress) {
transmitMessage(agentMessage);
}
}
void MessageService::handle_write(const boost::system::error_code &error) {
if (!error) {
messageQueue->pop_front();
if (!messageQueue->empty()) {
transmitMessage(messageQueue->front());
}
} else {
std::cout << error << std::endl;
do_close();
}
}
void MessageService::handle_connect(const boost::system::error_code &error,
tcp::resolver::iterator endpoint_iterator) {
// can be used to receive commands from the Java profiler interface
}
MessageService::~MessageService() {
// TODO Auto-generated destructor stub
}
頭文件:
using boost::asio::ip::tcp;
class MessageService {
public:
MessageService(std::string ip, std::string port);
virtual ~MessageService();
void write(AgentMessage agentMessage, long systemTime, int JVM_ID);
void await();
private:
boost::asio::io_service io_service;
boost::asio::io_service::work work;
tcp::resolver resolver;
tcp::resolver::iterator endpoint_iterator;
tcp::socket socket;
std::deque<AgentMessage> *messageQueue;
void do_write(AgentMessage agentMessage);
void do_close();
void handle_write(const boost::system::error_code &error);
void handle_connect(const boost::system::error_code &error,
tcp::resolver::iterator endpoint_iterator);
void transmitMessage(AgentMessage agentMessage);
};
既然你提到'boost :: thread'我只能假設你在程序中使用了多個線程,但你沒有鎖定來確保訪問消息隊列(我假設這是唯一的共享資源)是安全的 - 我必須承認,我沒有經過代碼,所以也許訪問隊列是單線程的...你告訴我 –
我認爲只有一個線程處理消息,因此隊列。由於唯一的功能,至少我認爲它是這種方式,這是由多個線程調用transmitMessage和transmitMessage委託與:io_service.post(boost :: bind(&MessageService :: do_write,this,agentMessage));我認爲這隻能由構造函數中啓動的一個線程執行。但也許我錯了。 –
'transmitMessage'不是由多個線程調用的,你只有一個線程在io服務上調度。我看不出上面的代碼有什麼問題(只要你只從另一個線程調用'write' - 說主線程) - 我會檢查接收者代碼。 – Nim