2017-08-08 81 views
1

我一直在測試將IO完成端口與線程池中的工作線程相結合,並偶然發現我無法解釋的行爲。特別是,雖然下面的代碼:從線程池工作線程使用GetQueuedCompletionStatus的奇怪行爲

int data; 
    for (int i = 0; i < NUM; ++i) 
     PostQueuedCompletionStatus(cp, 1, NULL, reinterpret_cast<LPOVERLAPPED>(&data)); 

    { 
     std::thread t([&]() 
     { 
      LPOVERLAPPED aux; 
      DWORD  cmd; 
      ULONG_PTR key; 

      for (int i = 0; i < NUM; ++i) 
      { 
       if (!GetQueuedCompletionStatus(cp, &cmd, &key, &aux, 0)) 
       break; 
       ++count; 
      } 
     }); 

     t.join(); 
    } 

作品完美的罰款,並接收NUM狀態通知(與NUM是大量的,10萬以上),使用線程池工作對象,上面寫着一個狀態通知了類似的代碼每個工作項目並在閱讀後重新發布工作項目,在閱讀幾百個狀態通知後失敗。具有以下全局變量(請不要介意名稱):

HANDLE cport; 
PTP_POOL pool; 
TP_CALLBACK_ENVIRON env; 
PTP_WORK work; 
std::size_t num_calls; 
std::mutex mutex; 
std::condition_variable cv; 
bool job_done; 

和回調函數:

static VOID CALLBACK callback(PTP_CALLBACK_INSTANCE instance_, PVOID pv_, PTP_WORK work_) 
{ 
    LPOVERLAPPED aux; 
    DWORD  cmd; 
    ULONG_PTR key; 

    if (GetQueuedCompletionStatus(cport, &cmd, &key, &aux, 0)) 
    { 
    ++num_calls; 
    SubmitThreadpoolWork(work); 
    } 
    else 
    { 
    std::unique_lock<std::mutex> l(mutex); 
    std::cout << "No work after " << num_calls << " calls.\n"; 
    job_done = true; 
    cv.notify_one(); 
    } 
} 

下面的代碼:

{ 
    job_done = false; 
    std::unique_lock<std::mutex> l(mutex); 

    num_calls = 0; 
    cport = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1); 

    pool = CreateThreadpool(nullptr); 
    InitializeThreadpoolEnvironment(&env); 
    SetThreadpoolCallbackPool(&env, pool); 

    work = CreateThreadpoolWork(callback, nullptr, &env); 

    for (int i = 0; i < NUM; ++i) 
     PostQueuedCompletionStatus(cport, 1, NULL, reinterpret_cast<LPOVERLAPPED>(&data)); 

    SubmitThreadpoolWork(work); 
    cv.wait_for(l, std::chrono::milliseconds(10000), [] { return job_done; }); 
} 

將報告「沒有更多的工作之後......「250個左右之後調用GetQueuedCompletionStatus,儘管NUM被設置爲1000000.更加奇怪的是,將等待從0設置爲10毫秒會增加succ的數量精心打電話給幾十萬人,偶爾會讀取所有1000000個通知。由於所有狀態通知都是在首次提交工作對象之前發佈的,所以我不太瞭解。

是否有可能確實存在將完成端口和線程池組合在一起的問題,或者在我的代碼中出現錯誤?請不要進入爲什麼我想要這樣做 - 我正在調查的可能性,並在這方面偶然發現。在我看來,它應該工作,並不能指出什麼是錯的。謝謝。

+0

您應檢查由'PostQueuedCompletionStatus'(和其他WINAPI函數)返回的值,並檢查'GetLastError'如果失敗。 – VTT

+0

完整的代碼做到了這一點,爲了簡單起見,我刪除了檢查。沒有錯誤報告。 –

+0

你應該添加它們。 – VTT

回答

0

我試過運行這段代碼,問題似乎是提供給CreateIoCompletionPortNumberOfConcurrentThreads參數。傳遞1意味着執行callback的第一個池線程與io完成端口關聯,但由於線程池可能執行callback使用不同的線程GetQueuedCompletionStatus會在發生這種情況時失敗。 From documentation

要仔細考慮的I/O完成端口最重要的屬性是併發值。通過參數NumberOfConcurrentThreads通過CreateIoCompletionPort創建完成端口的併發值。此值限制與完成端口關聯的可運行線程的數量。當與完成端口相關聯的可運行線程的總數達到併發值時,系統將阻止與該完成端口相關聯的任何後續線程的執行,直到可運行線程的數量下降到低於併發值。

儘管任意數量的線程可以調用GetQueuedCompletionStatus爲指定的I/O完成端口,當指定的線程調用GetQueuedCompletionStatus在第一時間,就成了具有指定I/O完成端口,直到三件事情之一相關情況:該線程退出,指定不同的I/O完成端口,或者關閉I/O完成端口。換句話說,一個線程最多可以與一個I/O完成端口相關聯。

所以使用IO使用線程池完成你需要的併發線程數設置爲線程池的大小(你可以使用SetThreadpoolThreadMaximum設置)。

::DWORD const threads_count{1}; 

cport = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, threads_count); 
... 
pool = ::CreateThreadpool(nullptr); 
::SetThreadpoolThreadMaximum(pool, threads_count); 
+0

謝謝,先生!這確實解決了這個問題。 –