2017-08-18 92 views
0

我正在嘗試使用boost單元測試來測試我的boost asio套接字偵聽器。監聽器的目的是簡單地監聽一個端口並讀取所有內容並將其保存到一個隊列中併發回一個http響應頭。即使長時間等待,C++線程也不會加入

作爲第一步,我創建了一個客戶端向偵聽器發送消息並讀取來自偵聽器的響應消息。我還創建了一個線程來啓動偵聽器。主線程將發送和接收來自Listener的消息。我可以在客戶端和列表程序之間發送和接收消息。但是當我嘗試加入時,它並沒有加入並繼續等待加入(我猜)。

請幫助我出了什麼問題。

在此先感謝。

Listener.hpp

#ifndef Listener_hpp 
#define Listener_hpp 
#include <iostream> 
#include <stdio.h> 
#include <atomic> 
#include <boost/asio.hpp> 
#include "sharedQueue.hpp" 

using namespace std; 
using namespace boost; 
using boost::asio::ip::tcp; 

class Listener{ 

private: 
int port; 
std::atomic<bool> m_stop; 
boost::system::error_code error; 
boost::asio::io_service io_service; 
std::shared_ptr<sharedQueue> queue_ptr; 
void saveMessageToQueue(std::string,std::string); 
Listener(int portToListen, std::shared_ptr<sharedQueue> queue_unique_ptr); 
Listener(const Listener&); 
Listener& operator=(const Listener&); 
public: 
static Listener& getInstance(int portToListenOn,   std::shared_ptr<sharedQueue> queue_unique_ptr); 
void listen(); 
}; 
#endif /* Listener_hpp */ 

Listener.cpp

#include "Listener.hpp" 
Listener::Listener(int portToListen, std::shared_ptr<sharedQueue> queue_unique_ptr):port(portToListen), queue_ptr(queue_unique_ptr),m_stop(false) 
{;} 
Listener& Listener::getInstance(int portToListenOn, std::shared_ptr<sharedQueue> queue_unique_ptr) 
{ 
    static Listener instance(portToListenOn, queue_unique_ptr); 
    return instance; 
} 
void Listener::stopListen(){ 
m_stop = true; 
} 
void Listener::listen() 
{ 
    try{ 
    tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), port)); 
    while(!m_stop) 
    { 
     std::cout<<"Server is listening on port "<<port<<std::endl; 
     boost::asio::ip::tcp::socket socket(io_service); 
     acceptor.accept(socket); 
     std::cout<<"Connection Request Accepted "<<std::endl; 
     std::array<char, 512> buf; 
     socket.read_some(boost::asio::buffer(buf), error); 
     std::string responseBack =" HTTP:300"; 
     boost::asio::write(socket, boost::asio::buffer(responseBack, responseBack.size()), error); 
    } 
    } 
    catch (std::exception& e) 
    { 
    std::cerr <<"Listener Exception : "<< e.what() << std::endl; 
    exit(0); 
    } 
    } 

sharedQueue.hpp

#ifndef SharedQueue_hpp 
    #define SharedQueue_hpp 
    #include <stdio.h> 
    #include <iostream> 
    class sharedQueue{ 
    public: 
    sharedQueue(){}; 
    void pushToQueue(std::string str){}; 
    }; 
    #endif /* SharedQueue_hpp */ 

ClientTestHelper.hpp

#ifndef ClientTestHelper_hpp 
    #define ClientTestHelper_hpp 

    #include <iostream> 
    #include <boost/asio.hpp> 
    using namespace std; 
    class ClientTestHelper 
    { 
    string address = ""; 
    public: 
    ClientTestHelper(std::string addressToPush){ 
    this->address = addressToPush; 
    }; 
    ~ClientTestHelper(){}; 
    std::string sendMessage(string message); 
    }; 

    #endif /* ClientTestHelper_hpp */ 

ClientTestHelper.cpp

#include "ClientTestHelper.hpp" 
std::string ClientTestHelper::sendMessage(string message){ 
boost::asio::io_service io_service; 
boost::asio::ip::tcp::resolver resolver(io_service); 
boost::asio::ip::tcp::resolver::query query(address, boost::asio::ip::resolver_query_base::numeric_service); 
boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query); 
boost::asio::ip::tcp::socket socket(io_service); 
boost::asio::connect(socket, endpoint_iterator); 
boost::asio::streambuf writeBuffer; 
boost::asio::streambuf readBuffer; 
std::string str = message; 
boost::system::error_code error; 
boost::asio::write(socket, boost::asio::buffer(str, str.size()), error); 
std::array<char,1024> buf; 
socket.read_some(boost::asio::buffer(buf), error); 
std::string respo = buf.data(); 
cout<<respo<<std::endl; 
return respo; 
} 

ListenerTest.hpp

#ifndef ListenerTest_hpp 
#define ListenerTest_hpp 

#define BOOST_TEST_DYN_LINK 
#define BOOST_TEST_MAIN 

#include <iostream> 
#include <stdio.h> 
#include <thread> 
#include <boost/test/unit_test.hpp> 
#include "Listener.hpp" 
#include "ClientTestHelper.hpp" 
#include "SharedQueue.hpp" 

struct ListenerTestFixture{ 
public: 
void startListen(Listener &receiver){ 
    receiver.listen(); 
} 
std::shared_ptr<sharedQueue> myQueue = std::make_shared<sharedQueue>(); 
Listener &receiver = Listener::getInstance(8282, myQueue); 
void doRun(){ 
    receiver.listen(); 
} 
ClientTestHelper *client = new ClientTestHelper("8282"); 
ListenerTestFixture(){}; 
~ListenerTestFixture(){}; 
}; 
#endif /* ListenerTest_hpp */ 

ListenerTest.cpp

#include "ListenerTest.hpp" 

BOOST_FIXTURE_TEST_CASE(connectionTest, ListenerTestFixture) 
{ 
std::thread t1(&ListenerTestFixture::doRun, *this); 
std::string response = ""; 
response = client->sendMessage("SomeRandom Messages \r\n\r\n\r\n"); 
//receiver.~Listener(); 
receiver.stopListen(); 
std::cout<<"Response Received "<<response; 
t1.join(); 
std::cout<<"Finished Testing "<<endl; 
} 

控制檯輸出對於澄清:

Running 1 test case... 
Server is listening on port 8585 
Connection Request Accepted 
Server is listening on port 8585 
    HTTP:300 

在這個執行沒有停止之後等待加入。

編輯:修改後的監聽器類添加成員m_stop和stopListen(),更改爲(;;)成while(!m_stop)。包含頭文件。

+3

爲什麼你手動調用一個對象的析構函數? – Blacktempel

+1

爲了加入,線程函數必須退出。 「聽」會退出嗎? –

+0

請提供示例代碼SSCCE。由於錯誤 – sehe

回答

2

receiver.~Listener();不符合你的想法。 該行代碼無法工作,必須出來。

它所做的是導致在receiver的所有成員上調用析構函數。它不會(直接)中斷對listen()的呼叫。 可能的結果是執行listen()的線程將執行未定義的行爲,幾乎肯定意味着無法以通知join()的方式結束。

總之,您的主線程將永遠等待,因爲您已從receiver()下拉出地毯。

即使這種方式工作,編譯器也會在main()的末尾添加一個隱含的receiver.~Listener()調用,並且如果偶然的話它可能會崩潰。

正常的解決方案是將成員添加到Listenerstd::atomic<bool> m_stop;(或者,如果你喜歡,如果你正在使用升壓Boost當量)它初始化到false在構造函數(不listen()),然後調用成員稱爲stop()這在撥打join()之前立即將其設置爲true

然後您將listen()中的非終止循環替換爲while(!stop)

這樣,你的主線程就可以通知聽到循環結束時有序的結論。

沒有簡單的方法讓線程「正好」或「突然」停止乾淨。你無法預測它在某個過程中的意義或者它正在使用的資源。 它可能會在庫調用中損壞堆,操作系統,文件系統,任何東西。

另外Java有一個叫做stop()的線程方法,它在添加之後立即被棄用。沒有安全的方式來實施或使用它。


這是罕見的,你應該直接調用析構函數。我知道的唯一用例是你已經使用了放置位置new,並且不希望將空間返回到堆中。在現代C中更爲罕見,因爲像容器回收空間這樣的東西全都交給std容器,如std::vector<>

std::vector<>的大多數實現調用析構函數,因爲如果您調整矢量大小,它需要破壞對象但不重新分配內部存儲。

你幾乎不應該在靜態分配的對象上調用析構函數。

+0

Hi @Persixty,謝謝你的回覆。我已經實現你的邏輯。但問題仍然存在。據我所知,從控制檯輸出中,執行'listen()'方法的第二個線程正在等待接受新連接,即'已經在循環內部'。在執行這些代碼之後,只有這個線程檢查'm_stop'的值。這就是爲什麼通過將'm_stop'設置爲'false'來終止循環的原因。 – Kid

+0

@kid你看過我對OP的評論嗎? Boost庫中顯然有辦法迫使'等待'活動停止使用'cancel()'。你嘗試過嗎?它也似乎有辦法使對象無阻塞。正如我所說我不是Boost專家。 – Persixty

+0

對於非常晚的回覆感到抱歉,我爲了測試而有一個想法(由於時間限制)。 'BOOST_FIXTURE_TEST_CASE(connectionTest,pushMessageReceiverFixture) { std :: thread t1(&pushMessageReceiverFixture :: doRun,* this); std :: string response =「」; response = pushServer-> pushEmptyMessage(「SomeRandom Messages \ r \ n \ r \ n \ r \ n」); receiver。〜pushMessageReceiver(); std :: cout <<「收到的響應」<<響應; pushServer-> pushEmptyMessage(「停止收聽\ r \ n \ r \ n \ r \ n」); t1.join(); std :: cout <<「Finished Testing」<< endl; }' – Kid

0

,如果你正在運行ASIO功能,如

acceptor.accept(socket); 

,他們將阻止,除非他們已經完成或有錯誤或異常返回(除了自己的基礎套接字已被設置爲非阻塞模式!)。因此,如果m_stop已設置或沒有設置,則不會進行測試。

你需要取消stop()函數內的任何運行的操作,即:

void Listener::stopListen(){ 
    std::error_code _ignore; 
    m_stop = true; 
    m_listener.cancel(_ignore); // pass in an ec, it may throw otherwise 
    m_sock.cancel(_ignore); 
} 

當然,你需要使嵌入類裏面的IO對象(插座/受體)使它們在stopListen()內可到達。

但是,說實話,我強烈推薦編寫與asynchroneous模式的代碼,並使用

m_acceptor.async_accept(m_sock, [&](const std::error_code &ec) { 
    if (ec) { 
     return handle_error(ec, "accept"); // bail out 
    } 

    handle_new_client(std::move(m_sock)); // move out socket 
    // restart listen here 
}); 

課程的線程的方式在更多的潛在的競爭條件,這是有時很難拉正確處理。

克里斯科爾霍夫有一些echo server examples具有不同的接受方式從阻塞到異步,顯示工作模式。

乾杯, A.

S:read_some()將最有可能不是你想要的。它只是返回已經到達您的計算機TCP堆棧的前幾個字節。但是不能保證完整的請求可用,所以你需要多次調用async_read。如果協議是http,則asio :: async_read_until會更好,並使用「\ r \ n」分隔符。