2010-09-11 57 views
1

性能調整:將數據寫入多個管道如何在c/C++中異步運行線程的特定函數?

現在我正在做它在一個單獨的線程:

for(unsigned int i = 0; i < myvector.size();) 
{ 
    tmp_pipe = myvector[i]; 
    fSuccess = WriteFile(tmp_pipe, &Time, sizeof(double), &dwWritten, NULL); 
    if(!fSuccess) 
    { 
     myvector.erase(myvector.begin()+i); 
     printf("Client pipe closed\r\n"); 
     continue; 
    } 
    fSuccess = WriteFile(tmp_pipe, &BufferLen, sizeof(long), &dwWritten, NULL); 
    if(!fSuccess) 
    { 
     myvector.erase(myvector.begin()+i); 
     printf("Client pipe closed\r\n"); 
     continue; 
    } 
    fSuccess = WriteFile(tmp_pipe, pBuffer, BufferLen, &dwWritten, NULL); 
    if(!fSuccess) 
    { 
     myvector.erase(myvector.begin()+i); 
     printf("Client pipe closed\r\n"); 
     continue; 
    } 
    i++; 
} 

,結果是第一pipe獲取數據速度最快,最後pipe最慢的。

我正在考慮在不同的線程中這樣做,因此每個pipe都被同樣處理。

但我怎麼能在c/C++中異步運行線程的特定函數(主線程應該立即返回)?

回答

2

可以使用CreateThread函數來創建一個新的線程,並通過管道句柄作爲參數傳遞給線程函數:

DWORD PipeThread(LPVOID param) { 
    HANDLE hPipe = (HANDLE)param; 
    // Do the WriteFile operations here 
    return 0; 
} 

for(unsigned int i = 0; i < myvector.size(); i++) 
    CreateThread(NULL, 0, PipeThread, myvector[i], 0, NULL); 

注意,矢量類不是線程安全的,所以你」如果你不同步對它們的訪問,將會遇到myvector.erase的問題,例如。使用關鍵部分。


更新:既然你提到的頻率高,你可以使用I/O completion ports而不是對每個管一個單獨的線程。那麼你可以使用重疊I /與WriteFile O操作異步執行寫,你可能只是一個額外的線程監聽寫操作完成:

// Initial setup: add pipe handles to a completion port 
HANDLE hPort = CreateCompletionPort(myvector[0], NULL, 0, 1); 
for (unsigned int i = 1; i < myvector.size(); i++) 
    CreateCompletionPort(myvector[i], hPort, 0, 0); 

// Start thread 
CreateThread(NULL, 0, PipeThread, hPort, 0, NULL); 

// Do this as many times as you want 
for(unsigned int i = 0; i < myvector.size(); i++) { 
    OVERLAPPED *ov = new OVERLAPPED; 
    ZeroMemory(ov, sizeof ov); 
    WriteFile(myvector[i], buffer, size, NULL, ov); 
    // If pipe handle was closed, WriteFile will fail immediately 
    // Otherwise I/O is performed asynchronously 
} 

// Close the completion port at the end 
// This should automatically free the thread 
CloseHandle(hPort); 

--- 

DWRD PipeThread(LPVOID param) { 
    HANDLE hPort = (HANDLE)param; 
    DWORD nBytes; 
    ULONG_PTR key; 
    LPOVERLAPPED ov; 

    // Continuously loop for I/O completion 
    while (GetQueuedCompletionStatus(hPort, &nBytes, &key, &ov, INFINITE)) { 
    if (ov != NULL) { 
     delete ov; 
     // Do anything else you may want to do here 
    } 
    } 

    return 0; 
} 
+0

我上面的'for'循環運行頻率非常高,所以我不能每次都運行'CreateThread',所以一切都應該假設這個線程已經創建好了。 – COMer 2010-09-11 04:44:42

+0

@COMer我不認爲這個解決方案是每次都調用CreateThread ...每次代碼的一部分是for循環標記爲多次你想要的。 – computinglife 2010-09-14 03:28:10

0

你有writev()可用?如果是這樣,您可以將三個寫入操作減少到每個管道一個,這會更有效。它也略有簡化了錯誤處理,但你可能崩潰,你有什麼:

for (unsigned int i = 0; i < myvector.size(); i++) 
{ 
    tmp_pipe = myvector[i]; 
    if (!WriteFile(tmp_pipe, &Time,  sizeof(double), &dwWritten, NULL) || 
     !WriteFile(tmp_pipe, &BufferLen, sizeof(long), &dwWritten, NULL) || 
     !WriteFile(tmp_pipe, pBuffer, BufferLen,  &dwWritten, NULL)) 
    { 
     myvector.erase(myvector.begin()+i); 
     printf("Client pipe closed\r\n"); 
    } 
} 

這是簡單的在許多方面看,因爲有1/3的錯誤處理 - 這樣的操作代碼是隱藏的少。

當然,您仍然希望將此代碼封裝到線程代碼中,因此寫操作將由單獨的線程處理。你會安排每個線程來獲得自己的管道;他們會共享對時間和緩衝區的只讀訪問權限。每個線程都會完成寫入,並在完成時返回狀態。父線程將等待每個子線程,並且如果子線程報告它失敗,則相應的客戶機管道將從該向量中移除。由於只有父線程會操縱向量,因此不需要擔心線程問題。

在大綱:

for (i = 0; i < myvector.size(); i++) 
     tid[i] = thread create (myvector[i], write_to_pipe); 
for (i = 0; i < myvector.size(); i++) 
{ 
     status = wait for thread(tid[i]); 
     if (status != success) 
      myvector.erase(...); 
} 

陣列(或向量)tid保持線身份。 write_to_pipe()函數是線程的主要功能;它會在傳遞的管道上進行書寫,並以適當的狀態退出。

+0

這裏的主要思想是減少'wait',所以我認爲'等待線程'不會提高性能。 – COMer 2010-09-11 06:50:09

+0

@COMer:我的印象是,目標是讓數據同時寫入多個目標。如果讓線程自由運行,那麼協調問題會更加嚴重 - 每個線程如何知道要寫什麼,以及何時寫它;協調器線程如何知道何時可以安全地更改寫入的數據值;協調員如何知道其中一個線程由於寫入失敗而放棄。 – 2010-09-11 13:34:10

0

這些命名管道?如果是這樣,創建它們時可以使用FILE_FLAG_OVERLAPPED,這允許您在不處理線程的情況下執行異步寫入。這裏是an example from MSDN

如果這些是匿名管道,重疊的I/O是not supported,所以這可能是切換到命名管道的一個很好的理由。

對於每次寫入,另一個選項可能是queue a work item,但這並不能保證所有三次寫入都將同時執行。

0

我會考慮以試圖減少I/O調用數量的方式來準備數據。在知道數據正在以儘可能少的I/O調用的高效方式進行編寫之後,我會考慮使用異步I/O。如果性能還不夠好,那麼可以考慮在設計中增加額外的線程。

您可能能夠減少寫入次數的一種方法是使用一種結合所有數據的結構,以便可以使用一次寫入而不是三次。爲了擺脫編譯器可能添加的任何額外的填充/對齊,編譯器將需要。

#pragma pack(push,1) 
struct PipeData { 
    double _time; 
    long _buffer_len; 
    char* _buffer; 
}; 
#pragma pack(pop) 

PipeData data; 
int data_len = sizeof(double) + sizeof(long) + <yourbufferlen>; 

for(unsigned int i = 0; i < myvector.size();) 
{ 
    tmp_pipe = myvector[i]; 
    fSuccess = WriteFile(tmp_pipe, &data, data_len, &dwWritten, NULL); 
    if(!fSuccess) 
    { 
     myvector.erase(myvector.begin()+i); 
     printf("Client pipe closed\r\n"); 
     continue; 
    } 
    i++; 
} 
0

要「在C/C++如何運行的線程異步的特定功能(在主線程應該得到立即返回)?」具體回答你的問題

你可以通過解除這兩個行爲輕鬆地做到這一點。創建一個工作線程池,由它需要與之通信的管道初始化,並編寫一個函數,該函數將爲這些線程安排一個新的工作。與管道寫入相比,這將是即時的,並且所有線程都將獲得其工作,並將開始寫入其控制的管道,而主線程可以繼續工作。

如果您的客戶數量不多,這將是一個簡單的解決方案。如果不是這樣,爲每個客戶端創建一個線程在一個點之後將不會擴展很多,並且您的服務器將會被大量的上下文切換和線程爭用所壓倒。

在這種情況下,對於非常服務器的設計,您應該認真思考卡薩布蘭卡的解決方案。它只創建一個線程來偵聽完成通知,並且是Windows 2003中用於在Windows中創建服務器的最有效的服務器設計。

相關問題