2013-07-15 82 views
1

我的服務器應用程序有一個奇怪的問題。我的系統很簡單:我有1個以上的設備和一個通過網絡進行通信的服務器應用程序。協議具有可變長度的二進制包,但具有固定標頭(包含有關當前包大小的信息)。數據包示例:EOF boost :: async_read thread_pull和boost 1.54

char pct[maxSize] = {} 
pct[0] = 0x5a //preambule 
pct[1] = 0xa5 //preambule 
pct[2] = 0x07 //packet size 
pct[3] = 0x0A //command 
... [payload] 

該協議建立在命令回答的原則基礎上。

我使用boost :: ASIO用於通信 - io_service對象與拉線(4個線程)+異步讀/寫操作(下面的代碼示例)並創建一個「查詢週期」 - 每個200毫秒計時器由:

  • 查詢從設備中的一個值
  • 獲取結果,查詢第二個值
  • 獲取結果,啓動定時器再次

這項工作非常出色升壓1.53(Debug和Release)。但後來我改用1.54(特別是在釋放模式下)的魔法開始。我的服務器成功啓動,連接到設備並啓動「查詢週期」。大約30-60秒,一切正常(我收到數據,數據是正確的),但是然後我開始在最後一個讀取句柄上接收asio :: error(總是在一個地方)。錯誤類型:EOF。收到錯誤後,我必須斷開設備連接。

一段時間的谷歌搜索給我關於EOF的信息表明另一側(在我的情況下設備)啓動斷開程序。但是,根據設備的邏輯,它不是真的。 可能有人解釋發生了什麼事?可能是我需要設置一些套接字選項或定義?我看到兩個可能的原因:

  • 我的方面init斷開(有一些原因,我不知道)和EOF是這個行動的答案。
  • 某些套接字超時觸發。

我的環境:

  • 操作系統:Windows 7/8
  • 編譯器:MSVC 2012更新3

主 「查詢週期」 的示例代碼。改編自官方boost chat example所有代碼都簡化了減少空間:)

  • SocketWorker - 對於插座低水平包裝
  • DeviceWorker - 類設備通信
  • ERES - 錯誤店面內部結構
  • ProtoCmd和ProtoAnswer - 用於原始數組命令和回答的包裝(chat_message 模擬從boost chat example
  • lw_service_proto命名空間 - 預定義命令和數據包的最大大小

因此,代碼示例。插座包裝:

namespace b = boost; 
namespace ba = boost::asio; 

typedef b::function<void(const ProtoAnswer answ)> DataReceiverType; 

class SocketWorker 
{ 
private: 
    typedef ba::ip::tcp::socket socketType; 
    typedef std::unique_ptr<socketType> socketPtrType; 
    socketPtrType devSocket; 
    ProtoCmd  sendCmd; 
    ProtoAnswer rcvAnsw; 

    //[other definitions] 

public: 

//--------------------------------------------------------------------------- 
ERes SocketWorker::Connect(/*[connect settings]*/) 
{ 
    ERes res(LGS_RESULT_ERROR, "Connect to device - Unknow Error"); 

    using namespace boost::asio::ip; 
    boost::system::error_code sock_error; 

    //try to connect 
    devSocket->connect(tcp::endpoint(address::from_string(/*[connect settings ip]*/), /*[connect settings port]*/), sock_error); 

    if(sock_error.value() > 0) { 
     //[work with error] 
     devSocket->close(); 
    } 
    else { 
     //[res code ok] 
    } 

    return res; 
} 
//--------------------------------------------------------------------------- 
ERes SocketWorker::Disconnect() 
{ 
    if (devSocket->is_open()) 
    { 
     boost::system::error_code ec; 
     devSocket->shutdown(bi::tcp::socket::shutdown_send, ec); 
     devSocket->close(); 
    } 
    return ERes(LGS_RESULT_OK, "OK"); 
} 

//--------------------------------------------------------------------------- 
//query any cmd 
void SocketWorker::QueryCommand(const ProtoCmd cmd, DataReceiverType dataClb) 
{ 
    sendCmd = std::move(cmd); //store command 
    if (sendCmd .CommandLength() > 0) 
    { 
     ba::async_write(*devSocket.get(), ba::buffer(sendCmd.Data(), sendCmd.Length()), 
         b::bind(&SocketWorker::HandleSocketWrite, 
           this, ba::placeholders::error, dataClb)); 
    } 
    else 
    { 
     cerr << "Send command error: nothing to send" << endl; 
    } 
} 

//--------------------------------------------------------------------------- 
// boost socket handlers 
void SocketWorker::HandleSocketWrite(const b::system::error_code& error, 
                DataReceiverType dataClb) 
{ 
    if (error) 
    { 
     cerr << "Send cmd error: " << error.message() << endl; 
     //[send error to other place] 
     return; 
    } 

    //start reading header of answer (lw_service_proto::headerSize == 3 bytes) 
    ba::async_read(*devSocket.get(), 
        ba::buffer(rcvAnsw.Data(), lw_service_proto::headerSize), 
        b::bind(&SocketWorker::HandleSockReadHeader, 
          this, ba::placeholders::error, dataClb)); 
} 
//--------------------------------------------------------------------------- 
//handler for read header 
void SocketWorker::HandleSockReadHeader(const b::system::error_code& error, DataReceiverType dataClb) 
{ 
    if (error) 
    { 
     //[error working] 
     return; 
    } 

    //decode header (check preambule and get full packet size) and read answer payload 
    if (rcvAnsw.DecodeHeaderAndGetCmdSize()) 
    { 
     ba::async_read(*devSocket.get(), 
        ba::buffer(rcvAnsw.Answer(), rcvAnsw.AnswerLength()), 
        b::bind(&SocketWorker::HandleSockReadBody, 
          this, ba::placeholders::error, dataClb)); 
    } 
} 
//--------------------------------------------------------------------------- 
//handler for andwer payload 
void SocketWorker::HandleSockReadBody(const b::system::error_code& error, DataReceiverType dataClb) 
{ 
    //if no error - send anwser to 'master' 
    if (!error){ 
     if (dataClb != nullptr) 
      dataClb(rcvAnsw); 
    } 
    else{ 
     //[error process] 

     //here i got EOF in release mode 
    } 
} 

}; 

設備工人

class DeviceWorker 
{ 
private: 
    const static int LW_QUERY_TIME = 200; 
    LWDeviceSocketWorker sockWorker; 
    ba::io_service& timerIOService; 
    typedef std::shared_ptr<ba::deadline_timer> TimerPtr; 
    TimerPtr  queryTimer; 
    bool   queryCycleWorking; 

    //[other definitions] 
public: 

ERes DeviceWorker::Connect() 
{ 
    ERes intRes = sockWorker.Connect(/*[connect settings here]*/); 

    if(intRes != LGS_RESULT_OK) { 
     //[set result to error] 
    } 
    else { 
     //[set result to success] 

     //start "query cycle" 
     StartNewCycleQuery(); 
    } 

    return intRes; 
} 
//--------------------------------------------------------------------------- 
ERes DeviceWorker::Disconnect() 
{ 
    return sockWorker.Disconnect(); 
} 
//--------------------------------------------------------------------------- 
void DeviceWorker::StartNewCycleQuery() 
{ 
    queryCycleWorking = true; 
    //start timer 
    queryTimer = make_shared<ba::deadline_timer>(timerIOService, bt::milliseconds(LW_QUERY_TIME)); 
    queryTimer->async_wait(boost::bind(&DeviceWorker::HandleQueryTimer, 
             this, boost::asio::placeholders::error)); 
} 
//--------------------------------------------------------------------------- 
void DeviceWorker::StopCycleQuery() 
{ 
    //kill timer 
    if (queryTimer) 
     queryTimer->cancel(); 

    queryCycleWorking = false; 
} 
//--------------------------------------------------------------------------- 
//timer handler 
void DeviceWorker::HandleQueryTimer(const b::system::error_code& error) 
{ 
    if (!error) 
    { 
     ProtoCmd cmd;  
     //query for first value 
     cmd.EncodeCommandCore(lw_service_proto::cmdGetAlarm, 1); 
     sockWorker.QueryCommand(cmd, boost::bind(&DeviceWorker::ReceiveAlarmCycle, 
           this, _1));  
    } 
} 
//--------------------------------------------------------------------------- 
//receive first value 
void DeviceWorker::ReceiveAlarmCycle(ProtoAnswer adata) 
{ 
    //check and fix last bytes (remove \r\n from some commands) 
    adata.CheckAndFixFooter(); 

    //[working with answer] 

    if (queryCycleWorking) 
    { 
     //query for second value 
     ProtoCmd cmd; 
     cmd.EncodeCommandCore(lw_service_proto::cmdGetEnergyLevel, 1); 
     sockWorker.QueryCommand(cmd, b::bind(&DeviceWorker::ReceiveEnergyCycle, 
             this, _1)); 
    } 
} 
//--------------------------------------------------------------------------- 
//receive second value 
void DeviceWorker::ReceiveEnergyCycle(ProtoAnswer edata) 
{ 
    //check and fix last bytes (remove \r\n from some commands) 
    edata.CheckAndFixFooter(); 

    //[working with second value] 

    //start new "query cycle" 
    if (queryCycleWorking) 
     StartNewCycleQuery(); 
} 

}; 

任何想法,歡迎:)

編輯: 幾個測試後,我看到anower圖片:

  • 這個問題僅在boost 1.54上重現(調試和釋放模式,釋放 - 更多更快),以提升1.53沒有更多的錯誤(也許我不好清理我的代碼,然後重建第一時間....)
  • 與提升1.54和1個線程(而不是4)所有的工作以及

我也花一些時間與調試器和升壓源,使一些結論:

  • 當我收到EOF我的數據已經完全接收。
  • 這EOF表明什麼在這次行動中轉移,即插座結果標誌爲0時(沒有錯誤),但提高操作標誌,如果EOF(傳輸的字節== 0)

在這一刻我強制打開提升1.53 ...

+1

我承認我沒有深入研究問題的描述......但一開始,緩衝區的一生對我來說是可疑的。特別是,你發送'buffer(cmd.Data(),cmd.Length())' - 其中'cmd'是一個本地對象,即緩衝區顯然不會超過async.operation。同樣的,'rcvAnsw'在什麼地方被定義? –

+0

@IgorR。 我的不好,很抱歉:) SocketWorker中定義了一個命令和一個答案的本地對象,因此它將保留所有的異步操作時間。 但是對於本地「cmd」是一個很好的問題。出於某種原因,我認爲緩衝區使發送數據的副本。嘗試保存命令本地... PS:添加本地命令來源在主帖 – ShaKeSPeaR

+1

不,緩衝區()免費函數不會複製並且不擁有底層緩衝區,它只是適應' ConstBufferSequence'(或'MutableBufferSequence')概念。 http://www.boost.org/doc/libs/1_54_0/doc/html/boost_asio/reference/buffer.html#boost_asio.reference.buffer.buffer_invalidation –

回答

0

我有完全相同的問題,我敢肯定,這是提升的一個bug :: ASIO 1.54.0

Here的錯誤報告。

該解決方案實際上回到1.53,儘管在bug報告頁面有1.54的補丁可用。

+0

謝謝,已經回到1.53。我希望補丁包含在1.55 :) – ShaKeSPeaR

+0

只是擡頭即將到來的[boost 1.55](http://www.boost.org/users/history/version_1_55_0.html)。看起來像它找到並修復與io_service +多線程池的Windows錯誤:) – ShaKeSPeaR

0

如果您的應用程序可以正常工作,但調用io_service::run()的單個線程卻失敗並且出現四個線程,您很可能會遇到競爭情況。這種類型的問題很難診斷。一般來說,您應該確保您的devSocket至多有一個未完成的async_read()async_write()操作。您當前執行的SocketWorker::QueryCommand()無條件調用async_write()可能違反順序假設documented這樣

此操作在零或多次調用方面落實到 流的async_write_some功能,被稱爲一個組成 操作。該程序必須確保該流不會執行寫入操作(例如async_write,流的async_write_some 函數或執行寫入的任何其他組合操作),直到該操作完成爲 。

該問題的classic solution是維護傳出消息隊列。如果先前的寫入未完成,請將下一個傳出消息附加到隊列中。當前一次寫入完成時,爲隊列中的下一條消息啓動async_write()。當使用多線程調用io_service::run()時,您可能需要使用鏈作爲鏈接的答案。

+0

感謝您的回答。我知道「在時間上對套接字進行一次異步操作」的問題,而我目前的設計則是通過手動控制查詢週期來避免這種情況。 'timer end-write cmd-read cmd -....- start timer')。 但無論如何我試圖用'strand'重寫我的代碼,但沒有運氣 - 問題仍然存在。 我強制暫時切換到1.53。 – ShaKeSPeaR

+0

我也嘗試用'\ r \ n'參數重寫代碼''async_read_until'(僅用於刪除我的處理程序)。但結果是一樣的 - 我得到'EOF'和'byte_transferred = 0',儘管我的緩衝區已經處理了正確的數據包,也就是說它已經被傳輸並且字節計數超過了0 .... – ShaKeSPeaR

+0

我剛剛花了週末時間調試一個類似問題與提升1.54。我找不到任何代碼錯誤。安裝1.55,並沒有再次看到這個問題(它發生在我每次運行特定的測試,運行相同的測試超過100次,現在升壓1.55,並沒有看到問題)。 – regu

相關問題