2011-01-06 16 views
4

我正在移植構建在ACE Proactor框架之上的應用程序。該應用程序對於VxWorks和Windows都運行良好,但在內核2.6.X.X的Linux(CentOS 5.5,WindRiver Linux 1.4 & 3.0)上使用librt無法實現。Linux中的同步套接字讀/寫(「全雙工」)(特定於aio)

我已經收窄的問題降到一個非常基本的問題: 應用程序開始異步(通過的aio_read)讀取套接字上操作,並隨後開始異步(通過aio_write)非常相同的插座上書寫。由於該協議是從應用程序結束時初始化的,因此讀取操作無法實現。 - 當套接字處於阻塞模式時,永遠不會寫入,協議「掛起」。 - 使用O_NONBLOCK套接字時,寫入成功,但讀取無限期地返回「EWOULDBLOCK/EAGAIN」錯誤,永遠不會恢復(即使重新啓動AIO操作)。

我經歷了多個論壇,無法找到一個明確的答案,這是否應該工作(而且我做錯了什麼事),或者不可能使用Linux AIO。如果我放棄AIO並尋求不同的實現(通過epoll/poll/select等),是否有可能?

附件是一個示例代碼來迅速重新產生在非阻塞套接字的問題:

#include <aio.h> 
#include <stdio.h> 
#include <stdlib.h> 
#include <netdb.h> 
#include <string.h> 
#include <netinet/in.h> 
#include <sys/socket.h> 
#include <sys/types.h> 
#include <assert.h> 
#include <errno.h> 

#define BUFSIZE (100) 

// Global variables 
struct aiocb *cblist[2]; 
int theSocket; 

void InitializeAiocbData(struct aiocb* pAiocb, char* pBuffer) 
{ 
    bzero((char *)pAiocb, sizeof(struct aiocb)); 

    pAiocb->aio_fildes = theSocket; 
    pAiocb->aio_nbytes = BUFSIZE; 
    pAiocb->aio_offset = 0; 
    pAiocb->aio_buf = pBuffer; 
} 

void IssueReadOperation(struct aiocb* pAiocb, char* pBuffer) 
{ 
    InitializeAiocbData(pAiocb, pBuffer); 

    int ret = aio_read(pAiocb); 
    assert (ret >= 0); 
} 

void IssueWriteOperation(struct aiocb* pAiocb, char* pBuffer) 
{ 
    InitializeAiocbData(pAiocb, pBuffer); 

    int ret = aio_write(pAiocb); 
    assert (ret >= 0); 
} 

int main() 
{ 
    int ret; 
    int nPort = 11111; 
    char* szServer = "10.10.9.123"; 

    // Connect to the remote server 
    theSocket = socket(AF_INET, SOCK_STREAM, 0); 
    assert (theSocket >= 0); 

    struct hostent *pServer; 
    struct sockaddr_in serv_addr; 
    pServer = gethostbyname(szServer); 

    bzero((char *) &serv_addr, sizeof(serv_addr)); 
    serv_addr.sin_family = AF_INET; 
    serv_addr.sin_port = htons(nPort); 
    bcopy((char *)pServer->h_addr, (char *)&serv_addr.sin_addr.s_addr, pServer->h_length); 

    assert (connect(theSocket, (const sockaddr*)(&serv_addr), sizeof(serv_addr)) >= 0); 

    // Set the socket to be non-blocking 
    int oldFlags = fcntl(theSocket, F_GETFL) ; 
    int newFlags = oldFlags | O_NONBLOCK; 

    fcntl(theSocket, F_SETFL, newFlags); 
    printf("Socket flags: before=%o, after=%o\n", oldFlags, newFlags); 

    // Construct the AIO callbacks array 
    struct aiocb my_aiocb1, my_aiocb2; 
    char* pBuffer = new char[BUFSIZE+1]; 

    bzero((char *)cblist, sizeof(cblist)); 
    cblist[0] = &my_aiocb1; 
    cblist[1] = &my_aiocb2; 

    // Start the read and write operations on the same socket 
    IssueReadOperation(&my_aiocb1, pBuffer); 
    IssueWriteOperation(&my_aiocb2, pBuffer); 

    // Wait for I/O completion on both operations 
    int nRound = 1; 
    printf("\naio_suspend round #%d:\n", nRound++); 
    ret = aio_suspend(cblist, 2, NULL); 
    assert (ret == 0); 

    // Check the error status for the read and write operations 
    ret = aio_error(&my_aiocb1); 
    assert (ret == EWOULDBLOCK); 

    // Get the return code for the read 
    { 
     ssize_t retcode = aio_return(&my_aiocb1); 
     printf("First read operation results: aio_error=%d, aio_return=%d - That's the first EWOULDBLOCK\n", ret, retcode); 
    } 

    ret = aio_error(&my_aiocb2); 
    assert (ret == EINPROGRESS); 
    printf("Write operation is still \"in progress\"\n"); 

    // Re-issue the read operation 
    IssueReadOperation(&my_aiocb1, pBuffer); 

    // Wait for I/O completion on both operations 
    printf("\naio_suspend round #%d:\n", nRound++); 
    ret = aio_suspend(cblist, 2, NULL); 
    assert (ret == 0); 

    // Check the error status for the read and write operations for the second time 
    ret = aio_error(&my_aiocb1); 
    assert (ret == EINPROGRESS); 
    printf("Second read operation request is suddenly marked as \"in progress\"\n"); 

    ret = aio_error(&my_aiocb2); 
    assert (ret == 0); 

    // Get the return code for the write 
    { 
     ssize_t retcode = aio_return(&my_aiocb2); 
     printf("Write operation has completed with results: aio_error=%d, aio_return=%d\n", ret, retcode); 
    } 

    // Now try waiting for the read operation to complete - it'll just busy-wait, receiving "EWOULDBLOCK" indefinitely 
    do 
    { 
     printf("\naio_suspend round #%d:\n", nRound++); 
     ret = aio_suspend(cblist, 1, NULL); 
     assert (ret == 0); 

     // Check the error of the read operation and re-issue if needed 
     ret = aio_error(&my_aiocb1); 
     if (ret == EWOULDBLOCK) 
     { 
      IssueReadOperation(&my_aiocb1, pBuffer); 
      printf("EWOULDBLOCK again on the read operation!\n"); 
     } 
    } 
    while (ret == EWOULDBLOCK); 
} 

由於提前, Yotam。

+0

請嘗試使用`ace-users @ list.isis.vanderbilt.edu`郵件列表:http://www.cs.wustl.edu/~schmidt/ACE-mail.html – 2011-01-06 16:27:21

回答

3

首先,O_NONBLOCK和AIO不混合。 AIO會報告時,相應的readwrite不會阻止異步操作完成 - 與O_NONBLOCK,他們會從未塊,所以aio請求將總是立即完成(與aio_return()EWOULDBLOCK)。其次,不要對兩個同時出現的aio請求使用相同的緩衝區。在發出aio請求的時間與aio_error()告訴您它已完成時間之間,緩衝區應被視爲完全限制。

第三,對同一文件描述符的AIO請求排隊,以便給出明智的結果。這意味着在讀取完成之前您的寫入不會發生 - 如果您需要先寫入數據,則需要按相反順序發出AIO。下面將做工精細,沒有設置O_NONBLOCK

struct aiocb my_aiocb1, my_aiocb2; 
char pBuffer1[BUFSIZE+1], pBuffer2[BUFSIZE+1] = "Some test message"; 

const struct aiocb *cblist[2] = { &my_aiocb1, &my_aiocb2 }; 

// Start the read and write operations on the same socket 
IssueWriteOperation(&my_aiocb2, pBuffer2); 
IssueReadOperation(&my_aiocb1, pBuffer1); 

// Wait for I/O completion on both operations 
int nRound = 1; 
int aio_status1, aio_status2; 
do { 
    printf("\naio_suspend round #%d:\n", nRound++); 
    ret = aio_suspend(cblist, 2, NULL); 
    assert (ret == 0); 

    // Check the error status for the read and write operations 
    aio_status1 = aio_error(&my_aiocb1); 
    if (aio_status1 == EINPROGRESS) 
     puts("aio1 still in progress."); 
    else 
     puts("aio1 completed."); 

    aio_status2 = aio_error(&my_aiocb2); 

    if (aio_status2 == EINPROGRESS) 
     puts("aio2 still in progress."); 
    else 
     puts("aio2 completed."); 
} while (aio_status1 == EINPROGRESS || aio_status2 == EINPROGRESS); 

// Get the return code for the read 
ssize_t retcode; 
retcode = aio_return(&my_aiocb1); 
printf("First operation results: aio_error=%d, aio_return=%d\n", aio_status1, retcode); 

retcode = aio_return(&my_aiocb1); 
printf("Second operation results: aio_error=%d, aio_return=%d\n", aio_status1, retcode); 

另外,如果你不關心讀取和寫入是相對於責令對方,你可以使用dup()創建兩個文件描述符套接字,並使用一個讀取和另一個寫入 - 每個將其AIO操作分開排隊。

+1

您是否參考了「 AIO對同一個fd的請求按順序排列「?如果是這樣的話,那麼更好的解決方案就是複製句柄。 – 2011-01-07 01:38:50