2017-09-07 24 views
2

我的代碼獲取圖像並對其進行處理。性能對我的代碼至關重要,所以我嘗試了多線程。目前,我只是把收購部分作爲一個獨立的線程。我正在使用std::queue實現一個簡單的FIFO緩衝區,用於存儲獲取的圖像。採集功能AcquireImages無限期地將原始圖像數據寫入該緩衝區,直到用戶中斷。處理函數,ProcessImages讀取緩衝區並處理圖像數據(目前在主線程中,但我計劃將其作爲單獨的線程以及一旦我解決了問題)。這裏是我的代碼(改性形成MCV example):如何讀取與另一個線程共享的std :: queue?

#include <iostream> 
#include <vector> 
#include <queue> 
#include <atomic> 
#include <thread> 

#define NUM_CAMERAS 2 

void AcquireImages(std::queue<unsigned char*> &rawImageQueue, std::atomic<bool> &quit) 
{ 
    unsigned char* rawImage{}; 

    while (!quit) 
    { 
     for (int camera = 0; camera < NUM_CAMERAS; camera++) 
     { 
      switch (camera) 
      { 
      case 0: 
       rawImage = (unsigned char*)"Cam0Image"; 
       break; 
      case 1: 
       rawImage = (unsigned char*)"Cam1Image"; 
       break; 
      default: 
       break; 
      } 

      rawImageQueue.push(std::move(rawImage)); 
     } 
    } 
} 

int ProcessImages(const std::vector<unsigned char*> &rawImageVec, const int count) 
{ 
    // Do something to the raw image vector 

    if (count > 10) 
    { 
     return 1; 
    } 
    else 
    { 
     return 0; 
    } // In my application, this function only returns non-zero upon user interception. 
} 


int main() 
{ 
    // Preparation 
    std::vector<unsigned char*> rawImageVec; 
    rawImageVec.reserve(NUM_CAMERAS); 
    std::queue<unsigned char*> rawImageQueue; 
    int count{}; 

    const unsigned int nThreads = 1; // this might grow later 

    std::atomic<bool> loopFlags[nThreads]; 
    std::thread  threads[nThreads]; 

    // Start threads 
    for (int i = 0; i < nThreads; i++) { 
     loopFlags[i] = false; 
     threads[i] = std::thread(AcquireImages, rawImageQueue, ref(loopFlags[i])); 
    } 

    // Process images 
    while (true) 
    { 

     // Process the images 
     for (int cam{}; cam < NUM_CAMERAS; ++cam) 
     { 
      rawImageVec.push_back(rawImageQueue.front()); 
      rawImageQueue.pop(); 
     } 

     int processResult = ProcessImages(move(rawImageVec), count); 
     if (processResult) 
     { 
      std::cout << "Leaving while loop.\n"; // In my application this is triggered by the user 
      break; 
     } 

     rawImageVec.clear(); 
     ++count; 
    } 

    // Shutdown other threads 
    for (auto & flag : loopFlags) { 
     flag = true; 
    } 

    // Wait for threads to actually finish. 
    for (auto& thread : threads) { 
     thread.join(); 
    } 

    return 0; 
} 

你們有些人可能已經注意到我的失誤。我所知道的是,這個程序在rawImageVec.push_back(rawImageQueue.front());處拋出異常。

拋出異常後輸出內容如下:

Debug Assertion Failed! 

Program: C:\WINDOWS\SYSTEM32\MSVCP140D.dll 
File: c:\program files (x86)\microsoft visual studio 14.0\vc\include\deque 
Line: 329 

Expression: deque iterator not dereferencable 

我認識這個問題的原因可能是,我正在讀與另一個線程共享的東西(對嗎?)。我該如何解決這個問題?

我跟着Praetorian的建議在評論後,檢查rawImageQueue是否爲空,我發現它總是空的。我不確定是什麼原因造成的。

+3

用互斥體或類似的方法保護隊列。 –

+1

在調用'rawImageQueue.front()'之前,您沒有檢查隊列是否爲空。並且從任一線程進入隊列的所有訪問必須受互斥體保護。 – Praetorian

+1

閱讀有關「生產者消費者模式」(例如https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem),解決當前問題後首先要考慮的問題 - 當生產者生產速度更快時會發生什麼比消費者消費 –

回答

2

以下是共享隊列上生產者/消費者的一般示例。這個想法是,如果你正在從一個數據結構中讀寫,你需要對訪問進行一些保護。

爲此,下面的示例使用條件變量和互斥鎖。

#include <thread> 
#include <iostream> 
#include <chrono> 
#include <queue> 
#include <mutex> 
#include <vector> 
#include <condition_variable> 

using namespace std::chrono_literals; 
using std::vector; 
using std::thread; 
using std::unique_lock; 
using std::mutex; 
using std::condition_variable; 
using std::queue; 

class WorkQueue 
{ 
    condition_variable work_available; 
    mutex work_mutex; 
    queue<int> work; 

public: 
    void push_work(int item) 
    { 
    unique_lock<mutex> lock(work_mutex); 

    bool was_empty = work.empty(); 
    work.push(item); 

    lock.unlock(); 

    if (was_empty) 
    { 
     work_available.notify_one(); 
    }  
    } 

    int wait_and_pop() 
    { 
    unique_lock<mutex> lock(work_mutex); 
    while (work.empty()) 
    { 
     work_available.wait(lock); 
    } 

    int tmp = work.front(); 
    work.pop(); 
    return tmp; 
    } 
}; 

int main() { 
    WorkQueue work_queue; 

    auto producer = [&]() { 
    while (true) { 
     work_queue.push_work(10); 
     std::this_thread::sleep_for(2ms); 
    } 
    }; 

    vector<thread> producers; 
    producers.push_back(std::thread(producer)); 
    producers.push_back(std::thread(producer)); 
    producers.push_back(std::thread(producer)); 
    producers.push_back(std::thread(producer)); 

    std::thread consumer([&]() {   
    while (true) 
    { 
     int work_to_do = work_queue.wait_and_pop(); 
     std::cout << "Got some work: " << work_to_do << std::endl; 
    } 
    }); 

    std::for_each(producers.begin(), producers.end(), [](thread &p) { 
    p.join(); 
    });  

    consumer.join(); 
} 
+1

這應該解決OP對空隊列的特殊問題。雖然值得一提的是你的隊列的大小是無限的,但如果工作產生得比消耗的速度快,它將會耗盡內存。 –

+1

@Gruffalo這是一個很好的觀點。在控制源代碼的最簡單的實現中,您可以向容器類添加隊列大小,並且可以推送阻止生產者。儘管如此,我認爲處理背壓依賴於實施細節。如果數據源在特定程序之外(例如任何類型的分佈式系統),則需要有一個退避協議,如tcp窗口。 – Josh

+0

您可能會失去'wait_and_pop'的'notify_one'觸發器,因爲您在'unlock' – curiousguy12

1

你的情況相對簡單,因爲你似乎只有一個生產者和一個消費者。此外,圖像處理聽起來很慢(足夠慢,不用擔心線程爭用),並且您正在從單線程版本切換,因此可能不需要打擾高效的無鎖實現。

我建議研究這個僞代碼:https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem#Using_monitors,然後在需要時瞭解條件變量:http://en.cppreference.com/w/cpp/thread/condition_variable

+0

鏈接無疑是有幫助的,儘管我需要更多的時間來使用我的代碼才能真正實現它的功能。我也發現'rawImageQueue'總是空的。 – db7638

+0

我希望你喜歡做得好,避免重新發明,因爲多線程可能會很棘手。 –

+0

這就是意圖。我不認爲我正在重新編寫我的代碼的任何部分。我仍然不明白爲什麼'rawImageQueue'是空的。 – db7638

相關問題