2011-07-16 130 views
2

我想在服務器和客戶端之間有非阻塞I/O。這兩個連接後,我試圖用fork來處理IO,但是在嘗試讀取「Transport endpoint is not connected」時服務器端發生錯誤,並且它發生了兩次(因爲我猜的是fork) 。簡單套接字非阻塞I/O

Server代碼

//includes taken out 

#define PORT "4950" 
#define STDIN 0 

struct sockaddr name; 

void set_nonblock(int socket) { 
    int flags; 
    flags = fcntl(socket,F_GETFL,0); 
    assert(flags != -1); 
    fcntl(socket, F_SETFL, flags | O_NONBLOCK); 
} 


// get sockaddr, IPv4 or IPv6: 
void *get_in_addr(struct sockaddr *sa) { 
    if (sa->sa_family == AF_INET) 
     return &(((struct sockaddr_in*)sa)->sin_addr); 

    return &(((struct sockaddr_in6*)sa)->sin6_addr); 
} 


int main(int agrc, char** argv) { 
    int status, sock, adrlen, new_sd; 

    struct addrinfo hints; 
    struct addrinfo *servinfo; //will point to the results 

    //store the connecting address and size 
    struct sockaddr_storage their_addr; 
    socklen_t their_addr_size; 

    //socket infoS 
    memset(&hints, 0, sizeof hints); //make sure the struct is empty 
    hints.ai_family = AF_INET; 
    hints.ai_socktype = SOCK_STREAM; //tcp 
    hints.ai_flags = AI_PASSIVE;  //use local-host address 

    //get server info, put into servinfo 
    if ((status = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo)) != 0) { 
     fprintf(stderr, "getaddrinfo error: %s\n", gai_strerror(status)); 
     exit(1); 
    } 

    //make socket 
    sock = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol); 
    if (sock < 0) { 
     printf("\nserver socket failure %m", errno); 
     exit(1); 
    } 

    //allow reuse of port 
    int yes=1; 
    if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(int)) == -1) { 
     perror("setsockopt"); 
     exit(1); 
    } 

    //unlink and bind 
    unlink("127.0.0.1"); 
    if(bind (sock, servinfo->ai_addr, servinfo->ai_addrlen) < 0) { 
     printf("\nBind error %m", errno); 
     exit(1); 
    } 

    freeaddrinfo(servinfo); 

    //listen 
    if(listen(sock, 5) < 0) { 
     printf("\nListen error %m", errno); 
     exit(1); 
    } 
    their_addr_size = sizeof(their_addr); 
    //accept 
    new_sd = accept(sock, (struct sockaddr*)&their_addr, &their_addr_size); 
    if(new_sd < 0) { 
     printf("\nAccept error %m", errno); 
     exit(1); 
    } 

    cout<<"\nSuccessful Connection!"; 

    //set nonblock 
    set_nonblock(new_sd); 

    char* in = new char[255]; 
    char* out = new char[255]; 
    int numSent; 
    int numRead; 
    pid_t pid; 

    fork(); 
    pid = getpid(); 

    if(pid == 0) { 

     while(!(out[0] == 'q' && out[1] == 'u' && out[2] == 'i' && out[3] == 't')) { 

      fgets(out, 255, stdin); 
      numSent = send(sock, out, strlen(out), 0); 

      if(numSent < 0) { 
       printf("\nError sending %m", errno); 
       exit(1); 
      } //end error 
     } //end while 
    } //end child 

    else { 
     numRead = recv(sock, in, 255, 0); 
     if(numRead < 0) { 
      printf("\nError reading %m", errno); 
      exit(1); 
     } //end error 
     else { 
      cout<<in; 
      for(int i=0;i<255;i++) 
       in[i] = '\0'; 

     } //end else 
    } //end parent 

    cout<<"\n\nExiting normally\n"; 
    return 0; 
} 

客戶端代碼

//包括取出

#define PORT "4950" 

struct sockaddr name; 

void set_nonblock(int socket) { 
    int flags; 
    flags = fcntl(socket,F_GETFL,0); 
    assert(flags != -1); 
    fcntl(socket, F_SETFL, flags | O_NONBLOCK); 
} 


// get sockaddr, IPv4 or IPv6: 
void *get_in_addr(struct sockaddr *sa) { 
    if (sa->sa_family == AF_INET) 
     return &(((struct sockaddr_in*)sa)->sin_addr); 

    return &(((struct sockaddr_in6*)sa)->sin6_addr); 
} 

int main(int agrc, char** argv) { 
    int status, sock, adrlen; 

    struct addrinfo hints; 
    struct addrinfo *servinfo; //will point to the results 

    memset(&hints, 0, sizeof hints); //make sure the struct is empty 
    hints.ai_family = AF_INET; 
    hints.ai_socktype = SOCK_STREAM; //tcp 
    hints.ai_flags = AI_PASSIVE;  //use local-host address 

    //get server info, put into servinfo 
    if ((status = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo)) != 0) { 
     fprintf(stderr, "getaddrinfo error: %s\n", gai_strerror(status)); 
     exit(1); 
    } 

    //make socket 
    sock = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol); 
    if (sock < 0) { 
     printf("\nserver socket failure %m", errno); 
     exit(1); 
    } 

    if(connect(sock, servinfo->ai_addr, servinfo->ai_addrlen) < 0) { 
     printf("\nclient connection failure %m", errno); 
     exit(1); 
    } 

    cout<<"\nSuccessful connection!"; 

    //set nonblock 
    set_nonblock(sock); 

    char* out = new char[255]; 
    char* in = new char[255]; 
    int numRead; 
    int numSent; 
    pid_t pid; 

    fork(); 
    pid = getpid(); 

    if(pid == 0) { 

     while(!(out[0] == 'q' && out[1] == 'u' && out[2] == 'i' && out[3] == 't')) { 

      fgets(out, 255, stdin); 
      numSent = send(sock, out, strlen(out), 0); 

      if(numSent < 0) { 
       printf("\nError sending %m", errno); 
       exit(1); 
      } //end error 
     } //end while 
    } //end child process 

    else { 
     while(!(in[0] == 'q' && in[1] == 'u' && in[2] == 'i' && in[3] == 't')) { 
     numRead = recv(sock, in, 255, 0); 
      cout<<in; 
      for(int i=0;i<255;i++) 
       in[i] = '\0'; 
     } 
    } //end parent process 

    cout<<"\n\nExiting normally\n"; 
    return 0; 
} 

我也使用線程做I/O嘗試。有問題,當我運行程序時,它就像線程不會發生。該程序只是運行「成功連接」,然後「正常退出」。我在while(1)循環中添加了一些cout語句,並且他們打印了幾次,但是由於某種原因它們停止了。我不確定它是否與我的線程或套接字有關。代碼(非常類似於上面),因爲這是在這裏 -

服務器

//includes taken out 

#define PORT "4950" 
#define STDIN 0 

pthread_t readthread; 
pthread_t sendthread; 

char* in = new char[255]; 
char* out = new char[255]; 
int numSent; 
int numRead; 

struct sockaddr name; 
int sock, new_sd; 

void* readThread(void* threadid) { 

    while(1) { 

     numRead = recv(new_sd, in, 255, 0); 

     if(numRead > 0) { 
      cout<<"\n"<<in; 
      for(int i=0;i<strlen(in);i++) 
       in[i] = '\0'; 
     } //end if 
     else if(numRead < 0) { 
      printf("\nError reading %m", errno); 
      exit(1); 
     } 

    } //end while 
} //END READTHREAD 


void* sendThread(void* threadid) { 

    while(1) { 

     cin.getline(out, 255); 

     numSent = send(new_sd, out, 255, 0); 

     if(numSent > 0) { 
      for(int i=0;i<strlen(out);i++) 
       out[i] = '\0'; 
     } //end if 
     else if(numSent < 0) { 
      printf("\nError sending %m", errno); 
      exit(1); 
     } 
    } //end while 
} //END SENDTHREAD 

void set_nonblock(int socket) { 
    int flags; 
    flags = fcntl(socket,F_GETFL,0); 
    assert(flags != -1); 
    fcntl(socket, F_SETFL, flags | O_NONBLOCK); 
} 

// get sockaddr, IPv4 or IPv6: 
void *get_in_addr(struct sockaddr *sa) { 
    if (sa->sa_family == AF_INET) 
     return &(((struct sockaddr_in*)sa)->sin_addr); 

    return &(((struct sockaddr_in6*)sa)->sin6_addr); 
} 

int main(int agrc, char** argv) { 
    int status, adrlen; 

    struct addrinfo hints; 
    struct addrinfo *servinfo; //will point to the results 

    //store the connecting address and size 
    struct sockaddr_storage their_addr; 
    socklen_t their_addr_size; 

    //socket infoS 
    memset(&hints, 0, sizeof hints); //make sure the struct is empty 
    hints.ai_family = AF_INET; 
    hints.ai_socktype = SOCK_STREAM; //tcp 
    hints.ai_flags = AI_PASSIVE;  //use local-host address 

    //get server info, put into servinfo 
    if ((status = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo)) != 0) { 
     fprintf(stderr, "getaddrinfo error: %s\n", gai_strerror(status)); 
     exit(1); 
    } 

    //make socket 
    sock = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol); 
    if (sock < 0) { 
     printf("\nserver socket failure %m", errno); 
     exit(1); 
    } 

    //allow reuse of port 
    int yes=1; 
    if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(int)) == -1) { 
     perror("setsockopt"); 
     exit(1); 
    } 

    //unlink and bind 
    unlink("127.0.0.1"); 
    if(bind (sock, servinfo->ai_addr, servinfo->ai_addrlen) < 0) { 
     printf("\nBind error %m", errno); 
     exit(1); 
    } 

    freeaddrinfo(servinfo); 

    //listen 
    if(listen(sock, 5) < 0) { 
     printf("\nListen error %m", errno); 
     exit(1); 
    } 

    their_addr_size = sizeof(their_addr); 
    //accept 
    new_sd = accept(sock, (struct sockaddr*)&their_addr, &their_addr_size); 
    if(new_sd < 0) { 
     printf("\nAccept error %m", errno); 
     exit(1); 
    } 

    cout<<"\nSuccessful Connection!"; 

    //set nonblock 
    set_nonblock(new_sd); 

    pthread_attr_t attr; 
    pthread_attr_init(&attr); 
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 

    pthread_create(&readthread, &attr, readThread, (void*)0); 
    pthread_create(&sendthread, &attr, sendThread, (void*)1); 

    cout<<"\n\nExiting normally\n"; 
    return 0; 
} 

客戶

#define PORT "4950" 

pthread_t readthread; 
pthread_t sendthread; 
char* in = new char[255]; 
char* out = new char[255]; 
int numSent; 
int numRead; 
struct sockaddr name; 
int sock; 

void* readThread(void* threadid) { 

    while(1) { 

     numRead = recv(sock, in, 255, 0); 

     if(numRead > 0) { 
      cout<<"\n"<<in; 
      for(int i=0;i<strlen(in);i++) 
       in[i] = '\0'; 
     } //end if 
     else if(numRead < 0) { 
      printf("\nError reading %m", errno); 
      exit(1); 
     } 
    } //end while 
} //END READTHREAD 

void* sendThread(void* threadid) { 

    while(1) { 

     cin.getline(out, 255); 
     numSent = send(sock, out, 255, 0); 

     if(numSent > 0) { 
      for(int i=0;i<strlen(out);i++) 
       out[i] = '\0'; 
     } //end if 
     else if(numSent < 0) { 
      printf("\nError sending %m", errno); 
      exit(1); 
     } 

    } //end while 
} //END SENDTHREAD 

void set_nonblock(int socket) { 
    int flags; 
    flags = fcntl(socket,F_GETFL,0); 
    assert(flags != -1); 
    fcntl(socket, F_SETFL, flags | O_NONBLOCK); 
} 


// get sockaddr, IPv4 or IPv6: 
void *get_in_addr(struct sockaddr *sa) { 
    if (sa->sa_family == AF_INET) 
     return &(((struct sockaddr_in*)sa)->sin_addr); 

    return &(((struct sockaddr_in6*)sa)->sin6_addr); 
} 

int main(int agrc, char** argv) { 
    int status, adrlen; 

    struct addrinfo hints; 
    struct addrinfo *servinfo; //will point to the results 

    memset(&hints, 0, sizeof hints); //make sure the struct is empty 
    hints.ai_family = AF_INET; 
    hints.ai_socktype = SOCK_STREAM; //tcp 
    hints.ai_flags = AI_PASSIVE;  //use local-host address 

    //get server info, put into servinfo 
    if ((status = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo)) != 0) { 
     fprintf(stderr, "getaddrinfo error: %s\n", gai_strerror(status)); 
     exit(1); 
    } 

    //make socket 
    sock = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol); 
    if (sock < 0) { 
     printf("\nserver socket failure %m", errno); 
     exit(1); 
    } 

    if(connect(sock, servinfo->ai_addr, servinfo->ai_addrlen) < 0) { 
     printf("\nclient connection failure %m", errno); 
     exit(1); 
    } 

    cout<<"\nSuccessful connection!"; 

    //set nonblock 
    set_nonblock(sock); 

    pthread_attr_t attr; 
    pthread_attr_init(&attr); 
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 

    pthread_create(&readthread, &attr, readThread, (void*)0); 
    pthread_create(&sendthread, &attr, sendThread, (void*)1); 

    cout<<"\n\nExiting normally\n"; 
    return 0; 
} 

我在很長的帖子道歉,但我一直在這幾天現在,我不知道如何繼續。我嘗試過使用select(),但對於只有1個客戶端和一個服務器I/O來說,這看起來有點多。如果任何人都可以指出上面可能會出現什麼問題或其他提示(或者我只是明確選擇錯誤:)),我將不勝感激。

+0

如果您喜歡閱讀源代碼,請查看node.js項目。他們做得很好,一切都很好記錄。你可以借用一些類似的概念,或者只是採取代碼,這是麻省理工學院。 – tjameson

回答

1

我看着前兩個代碼服務器和客戶端。不幸的是,你錯誤地設計了你的程序。服務器應該做以下(在多進程的情況下):

  1. 創建一個Socket
  2. 綁定到一個端口號
  3. 切換到被動模式(通過聽())。
  4. 轉至循環以接受傳入連接。這裏sock只負責監聽新的連接,它不負責像你一樣發送/接收。
    • 接受一個新的連接。接受()
    • 創建新的進程
    • 新進程負責做送/ recv的。父進程應返回到第4步以偵聽新連接。

下面是一個僞代碼

while (1) { 
    new_sd = accept(....); 
    if (new_sd < 0) continue; // or quit 
    if (fork() == 0) { 
     // do sendin and receiving USING new_sd 
     exit(0); // child terminates here 
    } 
} 

更新:因爲你正在尋找無阻塞的情況下,我會建議你使用select()或poll()系統調用。

我覺得你要創建兩個過程,一個過程是用於發送,另一個用於接收。如果是這樣,那麼您不需要將new_sd設置爲非阻塞模式,因爲這兩個進程同時運行。如果你打算這樣做,那麼子進程將創建另一個進程,以便第一個孩子發送和第二個孩子接收。如下:

while (1) { 
    new_sd = accept(....); 
    if (new_sd < 0) continue; // or quit 
    if (fork() == 0) { 
     pid = fork(); 
     if (pid == 0) then do_sending(new_sd); 
     else do_receiving(new_sd); 
     exit(0); // child terminates here 
    } 
} 
+0

服務器代碼現在看起來像這樣。在客戶端,我做了同樣的事情減去接受呼叫(它只是(1)和叉爲IO)。但是,當客戶端連接時,當我嘗試讀取或發送errno爲「資源暫時不可用」時,我只是遇到無限錯誤。你知道這意味着什麼嗎? – Sterling

+0

這絕對是一個更好的實現。正如我上面所說的,「資源暫時不可用」是由EAGAIN錯誤引起的,它表明套接字是非阻塞的並且沒有數據。這似乎有點奇怪,但不是所有的錯誤都是平等的。如果您閱讀send(2)和recv(2)的手冊頁,您會看到不同的錯誤代碼意味着不同的事情,其中​​一些錯誤代碼需要以除打印錯誤消息之外的方式處理。 – Sean

+0

每當我將其更改爲阻止時,我都會通過對等錯誤重置連接,並凍結我的計算機。我試圖搜索谷歌,但大多數人只是問如何創建該錯誤,而不是如何繞過它。 – Sterling

2

這已經有一段時間,因爲我已經做任何POSIX編程,但下面的代碼似乎有點奇怪:

fork(); 
pid = getpid(); 
if(pid == 0) { 

IIRC,你應該檢查叉的返回值:0的子進程,父進程中的子進程的pid。

沒有看過你的代碼:)

+0

我試過了,沒有幫助。我這樣做只是因爲我之前看到的一些線索是這樣做的。 – Sterling

+1

那麼,你的getpid()應該永遠不會返回0,從fork()檢查返回是正確的方式...錯誤是其他地方:)。我會建議最終尋找一個更高級別的異步框架,但它仍然是一個很好的練習來追蹤當前的錯誤 - 抱歉,我沒有時間仔細查看代碼。 – snemarch

+0

+1,很好。 – 2011-07-16 00:54:53

4

的其餘部分,我認爲你的問題是在這裏:

numSent = send(sock, out, strlen(out), 0); 

這裏:

numRead = recv(sock, in, 255, 0); 

,你正試圖sendrecv在您的監聽套接字上。您需要使用new_sd這是可接受的套接字。請參閱accept(2)的手冊頁。

+0

肖恩,很好! – 2011-07-16 00:55:25

+0

謝謝,我將服務器端切換到了這一點。但是,現在我在嘗試收回「資源暫時不可用」時收到錯誤消息。哪些資源不可用? – Sterling

+0

該錯誤(EAGAIN)表示該套接字是非阻塞的並且沒有數據。如果你查看手冊頁,你會看到它在那裏拼寫。 – Sean