2016-07-07 98 views
0

因此,我正在寫一種示波器式的程序,它讀取計算機上的串行端口並對該數據執行fft以將其轉換爲頻譜。我遇到了一個問題,雖然我的程序的佈局分解成SerialHandler類(利用boost::Asio),FFTHandler類和main函數。 SerialHandler類使用boost::Asio`` async_read_some函數從端口讀取並引發一個名爲HandleOnPortReceive的事件,然後它自己讀取數據。C++信號量混亂?

問題是我找不到將事件處理程序中的數據從另一個線程上的io_service對象傳遞到另一個線程上的FFTHandler類的方法。我被推薦使用信號量來解決我的問題,但是我對semaphore.h的用法幾乎一無所知,所以我的實現現在相當破碎,並且沒有做任何事情。

下面是一些代碼,如果這使得它更清晰一點:

using namespace Foo; 
//main function  
int main(void){ 
    SerialHandler serialHandler; 
    FFTHandler fftHandler; 

    sem_t *qSem_ptr = &qSem; 

    sem_init(qSem_ptr, 1, 0); 

    //create separate threads for both the io_service and the AppendIn so that neither will block the user input statement following 
    serialHandler.StartConnection(tempInt, tempString); //these args are defined, but for brevity's sake, I ommitted the declaration 

    t2= new boost::thread(boost::bind(&FFTHandler::AppendIn, &fftHandler, q, qSem)); 

    //allow the user to stop the program and avoid the problem of an infinite loop blocking the program 
    char inChar = getchar(); 
    if (inChar) {...some logic to stop reading} 
} 




namespace Foo{ 

    boost::thread *t1; 
    boost::thread *t2; 
    sem_t qSem; 
    std::queue<double> q; 
    boost::mutex mutex_; 

    class SerialHandler{ 
    private: 
     char *rawBuffer; //array to hold incoming data 
     boost::asio::io_service ioService; 
     boost::asio::serial_port_ptr serialPort; 
    public: 

      void SerialHandler::StartConnection(int _baudRate, string _comPort){ 
       //some functionality to open the port that is irrelevant to the question goes here 

       AsyncReadSome(); //starts the read loop 

       //create thread for io_service object and let function go out of scope 
       t1 = new boost::thread(boost::bind(&boost::asio::io_service::run, &ioService)); 

      } 

      void SerialHandler::AsyncReadSome(){ 

       //there's some other stuff here for error_catching, but this is the only important part 
       serialPort->async_read_some (
          boost::asio::buffer(rawBuffer, SERIAL_PORT_READ_BUF_SIZE), 
          boost::bind(
            &SerialHandler::HandlePortOnReceive, 
            this, boost::asio::placeholders::error, 
            boost::asio::placeholders::bytes_transferred, q)); 
      } 

      void SerialHandler::HandlePortOnReceive(const boost::system::error_code& error, size_t bytes_transferred, std::queue<double>& q){ 
       boost::mutex::scoped_lock lock(mutex_); 
       //more error checking goes here, but I've made sure they aren't returning and are not the issue 

       for (unsigned int i =0; i<bytes_transferred; i++){ 
        unsigned char c = rawBuffer[i]; 
        double d = (double) c; //loop through buffer and read 
        if (c==endOfLineChar){ 
        } else //if not delimiting char, push into queue and post semaphore 
        { 
          q.push(d); 
          //cout << d << endl; 
          sem_post(&qSem); 
          cout << q.front() << endl; 
          cout << "size is: " << q.size() << endl; 
        } 
       } 
       //loop back on itself and start the next read 
       AsyncReadSome(); 

      } 
    } 

    class FFTHandler{ 
    private: 
     double *in; //array to hold inputs 
     fftw_complex *out; //holds outputs 
     int currentIndex; 
     bool filled; 
     const int N; 
    public: 


     void AppendIn(std::queue<double> &q, sem_t &qSem){ 
       while(1){ //this is supposed to stop thread from exiting and going out of scope...it doesn't do that at all effectively... 
        cout << "test" << endl; 
        sem_wait(&_qSem); //wait for data...this is blocking but I don't know why 
        double d = _q.front(); 
        _q.pop(); 
        in[currentIndex]=d; //read queue, pop, then append in array 
        currentIndex++; 
        if (currentIndex == N){ //run FFT if full and reset index 
          currentIndex = N-overlap-1; 
          filled = true; 
          RunFFT(); 
        } 
       } 

     } 
    } 

} 

,在FFTHandler::AppendIn(..)調試行確實是射擊,所以正在創建線程,但它immediateley走出去的範圍似乎並破壞了線程,因爲它似乎已經設置了對信號量進行錯誤響應的時間。

TLDR:這是一個很長的解釋簡單地說,「我不明白,但信號燈需要以某種方式實現這些我努力了,失敗了,所以現在我來這裏希望得到這個代碼的幫助。從別人比我更瞭解

更新:所以一些調試語句玩耍後,似乎問題是while(1){...}聲明確實是射擊,但該sem_wait(&_qSem);導致它塊什麼的。因爲它無限期地等待着,儘管信號量已經被髮布,但它仍然在等待,並且永遠不會超出該線。

+1

我看不出任何具體的_semaphore usage_錯誤,但是我可以看到線程潛在的問題 - 在線程完成之前'main'退出,例如? (從你剛剛說的代碼中不清楚「...一些邏輯停止閱讀」) –

+2

爲什麼你有Seri​​alHandler和FFTHandler運行在不同的線程?如果意圖是將工作傳遞給另一個線程,以便SerialHandler可以接收下一組數據,那麼您可能需要考慮Leader/Followers模式,並有一個運行io_service的線程池。在這裏,每個線程都將使用SerialHandler讀取數據並使用FFTHandler處理數據,一旦完成,將返回到池中以等待下一個io事件。 – aichao

+0

@aichao我有Seri​​alHandler io_service在一個單獨的線程上運行,因爲它在處理程序中循環回去,因此它創建了一個無限循環。但是,我希望能夠停止程序,不可阻擋的無限循環並不是一個理想的功能,所以我將它移動到另一個線程,以便能夠接受用戶輸入以最終在用戶感覺喜歡時終止該循環。 'FFTHandler :: AppendIn(...)'函數在它自己的線程中是出於類似的原因,不阻塞主線程並釋放用戶在程序運行時能夠做的事情(即停止程序) 。 – Scorch

回答

2

既然您已經在使用boost::mutex及其範圍鎖定類型,我建議您使用boost::condition_variable而不是POSIX信號量。否則,你將C++ 11風格的同步與POSIX同步混合在一起。

添加到隊列時鎖定互斥鎖,但我沒有看到鎖定互斥鎖以從隊列中讀取的任何內容。它也看起來像循環回調到AsyncReadSome而互斥鎖仍處於鎖定狀態。

選擇單一形式的同步,然後正確使用它。

+0

感謝您的建議,而這似乎清除了一些混淆。我拉動信號量,並根據你的想法使用boost :: condition_variable,這似乎有效。雖然它引入了另一個問題(分段錯誤),但據我所知,這與信令和條件變量無關。 – Scorch

1

信號量的初始值爲0,對此情況有效。所以它需要一個sem_post爲FFTHandler::AppendIn()被解除封鎖。但是我沒有看到第一次調用SerialHandler::AsyncReadSome()的代碼來讀取串行端口並將其發送到隊列中。如果你修復了那部分代碼,我認爲sem_post會發生並且FFTHandler線程會運行。作爲第一步,您可以在sem_wait和AsyncReadSome()函數內打印一個調試打印,而我的猜測是兩者都不會被執行。

所以,基本上你會希望確保'閱讀'被啓動並作爲主線程或不同線程的一部分保持活動狀態。

+0

我很抱歉,那是我的錯,我意識到我在複製代碼時犯了一個錯誤。在創建新線程t1之前,我正在調用AsyncReadSome()。我會更新上面的代碼。但是,爲了以防萬一,我確實創建了一些調試行,並且這些行正在觸發,因此它正在執行AsyncReadSome()。這就是說,在玩弄你的建議的同時,我發現了一些與信號量有關的有趣事情,所以我也會將其添加到該問題中。 – Scorch

+0

您在更新中提出的觀點正是我的意思:)感謝您保持發佈。 – Gops