2012-02-25 19 views
0

我有一個使用4個線程,以執行以下操作一個多線程的OpenCV程序:OpenCV的多線程(在Windows/.NET)延遲來自視頻捕獲幾秒

螺紋1->調用cvQueryFrame()其從攝像機抓取的幀圖像逐個並且將它們存儲到std::vectorinputBuffer

螺紋2->上inputBuffer[0]進行閾值化,導致複製到另一個std::vector稱爲filterOutputBuffer

螺紋3->執行光流算法/吸引流在filterOutputBuffer前兩個元素字段,副本導致到另一個std::vector稱爲ofOutputBuffer

螺紋4->顯示使用cvShowImage(ofOutputBuffer[0])

所以基本上我被設想每個線程對應的第一個元素上執行該任務的圖像輸入向量/緩衝區,並將結果存儲在相應輸出向量的後面。像3名工廠工人一樣在裝配線上工作,然後將最終結果投入到下一個人的桶中。

我爲所有緩衝區設置了互斥鎖,程序工作正常,只有輸出延遲了幾秒鐘。

我運行了一個非多線程版本的相同程序(使用一個巨大的while(true)循環),它只是偶爾出現口吃而實時運行。

爲什麼我的併發執行在性能上如此之慢?

下面是線程函數:

void writeBuffer() 
    { 
     cout << "Thread " << GetCurrentThreadId() << ": Capturing frame from camera!" << endl; 
     CvCapture *capture = 0; 
     IplImage *frame = 0; 
     DWORD waitResult; 

     if (!(capture = cvCaptureFromCAM(0))) 
      cout << "Cannot initialize camera!" << endl; 

     //now start grabbing frames and storing into the vector inputBuffer 
     while (true) 
     { 
      //cout << "Thread " << GetCurrentThreadId() << ": Waiting for mutex to write to input buffer!..." << endl; 
      waitResult = WaitForSingleObject(hMutex, INFINITE); 
      switch(waitResult) 
      { 
       // The thread got ownership of the mutex 
       case WAIT_OBJECT_0: 
        frame = cvQueryFrame(capture); //store the image into frame 
        if(!frame) 
        { 
         cout << "Thread " << GetCurrentThreadId() << ": Error capturing frame from camera!" << endl; 
        } 
        //cout << "Thread " << GetCurrentThreadId() << ": Getting Frame..." << endl; 
        inputBuffer.push_back(*frame); 
       break; 
       default: 
        cout << "Thread " << GetCurrentThreadId() << ": Error acquiring mutex..." << endl; 
      } 
      if(!ReleaseMutex(hMutex)) 
      { 
       cout << "Thread " << GetCurrentThreadId() << ": Error releasing mutex..." << endl; 
      } 
      //else cout << "Thread " << GetCurrentThreadId() << ": Done writing to input buffer, Mutex Released!" << endl; 
      //signal hDoneGettingFrame 
      PulseEvent(hDoneGettingFrame); 
     } 
      cout << "Thread " << GetCurrentThreadId() << ": Exiting..." << endl; 
    } 


    void opticalFlow() 
    { 
    ... 
     DWORD waitResult; 

     //start grabbing frames from the vector inputBuffer 
     cout << "Thread " << GetCurrentThreadId() << ": Waiting to read from input buffer..." << endl; 
     while(true) 
     { 
      waitResult = WaitForSingleObject(fMutex, INFINITE); 
      switch(waitResult) 
      { 
       // The thread got ownership of the mutex 
       case WAIT_OBJECT_0: 
        //grab first two frames from buffer (inputBuffer[0-1]) and process them 
        if(filterOutputBuffer.size() > 1) 
        { 
         frame1 = filterOutputBuffer[0]; 
         frame2 = filterOutputBuffer[1]; 
         filterOutputBuffer.erase(filterOutputBuffer.begin()); 
        } 
        else 
        { 
         if(!ReleaseMutex(fMutex)) 
          cout << "Thread " << GetCurrentThreadId() << ": Error releasing filter mutex..." << endl; 
         //else cout << "Thread " << GetCurrentThreadId() << ": Input Buffer empty!" << endl; 
         continue; 
        } 
       break; 
       default: 
        cout << "Thread " << GetCurrentThreadId() << ": Error acquiring input mutex..." << endl; 
        continue; 
      } 
      if(!ReleaseMutex(fMutex)) 
      { 
       cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl; 
      } 
    ... 
    //Do optical flow stuff 
    ... 
    waitResult = WaitForSingleObject(oMutex, INFINITE); 
      switch(waitResult) 
      { 
       // The thread got ownership of the mutex 
       case WAIT_OBJECT_0: 
        //cout << "Thread " << GetCurrentThreadId() << ": WRITING TO OUTPUT BUFFER..." << endl; 
        ofOutputBuffer.push_back(*frame1_3C); 
       break; 
       default: 
        cout << "Thread " << GetCurrentThreadId() << ": Error acquiring output mutex..." << endl; 
      } 
      if(!ReleaseMutex(oMutex)) 
       cout << "Thread " << GetCurrentThreadId() << ": Error releasing output mutex..." << endl; 
    } 
     cout << "Thread " << GetCurrentThreadId() << ": Exiting..." << endl; 
    } 

    void filterImage() 
{ 
    DWORD waitResult; 
... 

    //start grabbing frames from the vector inputBuffer 
    cout << "Thread " << GetCurrentThreadId() << ": Waiting to read from input buffer..." << endl; 
    while(true) 
    { 
     waitResult = WaitForSingleObject(hMutex, INFINITE); 
     switch(waitResult) 
     { 
      // The thread got ownership of the mutex 
      case WAIT_OBJECT_0: 
       //grab first frame and then release mutex 
       if(inputBuffer.size() > 0) 
       { 
        frame = inputBuffer[0]; 
        inputBuffer.erase(inputBuffer.begin()); 
       } 
       else 
       { 
        if(!ReleaseMutex(hMutex)) 
         cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl; 
        //else cout << "Thread " << GetCurrentThreadId() << ": Input Buffer empty!" << endl; 
        continue; 
       } 
      break; 
      default: 
       cout << "Thread " << GetCurrentThreadId() << ": Error acquiring input mutex..." << endl; 
       continue; 
     } 
     if(!ReleaseMutex(hMutex)) 
     { 
      cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl; 
     } 
... 
//Tresholding Image Stuff 
... 
     //cout << "Thread " << GetCurrentThreadId() << ": Waiting to write to output buffer..." << endl; 
     waitResult = WaitForSingleObject(fMutex, INFINITE); 
     switch(waitResult) 
     { 
      // The thread got ownership of the mutex 
      case WAIT_OBJECT_0: 
       //cout << "Thread " << GetCurrentThreadId() << ": WRITING TO OUTPUT BUFFER..." << endl; 
       filterOutputBuffer.push_back(*out); 
      break; 
      default: 
       cout << "Thread " << GetCurrentThreadId() << ": Error acquiring filter mutex..." << endl; 
     } 
     if(!ReleaseMutex(fMutex)) 
      cout << "Thread " << GetCurrentThreadId() << ": Error releasing filter mutex..." << endl; 

    } 
} 

void displayImage() 
{ 
    DWORD waitResult; 
    IplImage final; 
    int c; 
    cvNamedWindow("Image", CV_WINDOW_AUTOSIZE); 
    //start grabbing frames from the vector ouputBuffer 
    cout << "Thread " << GetCurrentThreadId() << ": Waiting to read from output buffer..." << endl; 
    while (true) 
    { 
      waitResult = WaitForSingleObject(oMutex, INFINITE); 
      switch(waitResult) 
      { 
        // The thread got ownership of the mutex 
        case WAIT_OBJECT_0: 
         if(ofOutputBuffer.size() > 0) 
         { 
          //cout << "Thread " << GetCurrentThreadId() << ": Reading output buffer..." << endl; 
          final = ofOutputBuffer[0]; 
          ofOutputBuffer.erase(ofOutputBuffer.begin()); 
         } 
         else 
         { 
          if(!ReleaseMutex(oMutex)) 
           cout << "Thread " << GetCurrentThreadId() << ": Error releasing output mutex..." << endl; 
          //else cout << "Thread " << GetCurrentThreadId() << ": Output Buffer is empty!" << endl; 
          continue; 
         } 
        break; 
        default: 
         cout << "Thread " << GetCurrentThreadId() << ": Error acquiring output mutex..." << endl; 
         continue; 
      } 
      if(!ReleaseMutex(oMutex)) 
       cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl; 
      //else cout << "Thread " << GetCurrentThreadId() << ": Done reading output buffer, mutex Released!" << endl; 

      //cout << "Thread " << GetCurrentThreadId() << ": Displaying Image..." << endl; 
      cvShowImage("Image", &final); 
      c = cvWaitKey(1); 
    } 
    cout << "Thread " << GetCurrentThreadId() << ": Exiting..." << endl; 
} 

這裏的主要功能是:

void main() 
{ 
    hMutex = CreateMutex(NULL, FALSE, NULL); 
    oMutex = CreateMutex(NULL, FALSE, NULL); 
    fMutex = CreateMutex(NULL, FALSE, NULL); 

    hDoneGettingFrame = CreateEvent(NULL, TRUE, FALSE, NULL); 
    hDoneReadingFrame = CreateEvent(NULL, TRUE, FALSE, NULL); 

    TName[0]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)writeBuffer, NULL, 0, &ThreadID); 
    TName[1]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)filterImage, NULL, 0, &ThreadID); 
    TName[2]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)opticalFlow, NULL, 0, &ThreadID); 
    TName[3]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)displayImage, NULL, 0, &ThreadID); 
    WaitForMultipleObjects(4, TName, TRUE, INFINITE); 
    CloseHandle(TName); 
} 

回答

0

信號量做到了!我不是使用單獨的互斥體,而是創建了一個信號量,讓所有線程都通過它。

謝謝,它現在運行的很快,很順利!

void main() 
{ 
    hSemaphore = CreateSemaphore( 
     NULL,   // default security attributes 
     MAX_THREADS, // available count (when a thread enters, it decreases) 
     MAX_THREADS, // maximum count 
     NULL);   // unnamed semaphore 

    if (hSemaphore == NULL) 
    { 
     printf("CreateSemaphore error: %d\n", GetLastError()); 
     return; 
    } 


    TName[0]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)writeBuffer, NULL, 0, &ThreadID); 
    TName[2]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)opticalFlow, NULL, 0, &ThreadID); 
    TName[3]= CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)displayImage, NULL, 0, &ThreadID); 
    WaitForMultipleObjects(4, TName, TRUE, INFINITE); 
    CloseHandle(TName); 
} 

而在線程...

//instead of separate Mutexes, just wait for semaphore 
    waitResult = WaitForSingleObject(hSemaphore, INFINITE); 
       switch(waitResult) 
       { 

          ... 

          } 
       if(!ReleaseSemaphore(hSemaphore, 1, NULL)) 
        cout << "Thread " << GetCurrentThreadId() << ": Error releasing input mutex..." << endl; 
+0

另外,我組合filterImage線程到opticalFlow線程 – Josh 2012-02-29 02:05:47

+0

不知道你做了什麼,在這裏。每個線程間隊列都需要自己的信號量來計數隊列項和自己的互斥量,以保護隊列不受多個訪問的影響。如果有5個線程,它們之間有4個階段,則需要4個隊列,4個信號量和4個互斥量。 – 2012-03-02 08:54:35

+0

我創建了一個信號量,並且所有線程都進入工作狀態,然後離開。如果我必須使用互斥體,信號量的目的究竟是什麼? – Josh 2012-03-09 02:48:55

0

多線程應用程序共享線程之間的CPU時間。因此,當另一個線程想要處於運行狀態時存在上下文切換。可能在線程之間切換會增加CPU時間,導致應用程序變慢。

0

嘗試使用threadPool可以最大限度地減少CPU在線程之間傳輸的時間。

+0

也許,如果我移動閾值和光流到同一個線程將最大限度地減少上下文切換? – Josh 2012-02-25 21:09:58

0

好了,下手,如果我搖你的第一個線程函數的有點繞環路:

if(!ReleaseMutex(hMutex)){} 
PulseEvent(hDoneGettingFrame); 
waitResult = WaitForSingleObject(hMutex, INFINITE); 

換一種方式,你的第一個線程持有到隊列互斥幾乎整個運行的第一個線程循環,所以阻止第二個線程到達任何地方。我猜所有其他線程的代碼都是一樣的嗎?

當在一個流水線中推送生產者 - 消費者隊列中的數據時,這個想法是你應該保持隊列鎖定的時間最短。在緩衝區對象上進行處理,然後鎖定隊列,按下對象引用,然後立即解鎖隊列。然後發出信號(或類似信號),以便下一個線程可以處理該對象。

不要保持隊列鎖定!該互斥體不應該存在於其他線程等待工作 - 這是爲了保護隊列不受多個訪問的影響。您需要一些其他信號來維護隊列計數和線程等待工作。不要爲此使用事件,無論您在網上看到多少其他示例 - 如果必須推出自己的生產者 - 消費者隊列,請使用信號量。

更好 - 使用已經工作的P-C隊列類 - 查看BlockingCollections類。