2015-07-11 60 views
7

在幾個線程中共享相同的Epoll fd(而不是socket fd)是否安全?如果是這樣,每個線程都必須將自己的事件數組傳遞給epoll_wait(2)還是可以共享?在線程之間共享相同的epoll文件描述符可以嗎?

例如

void *thread_func(void *thread_args) { 
     // extract socket_fd, epoll_fd, &event, &events_array from 
     //  thread_args 
     // epoll_wait() using epoll_fd and events_array received from main 
     // now all threads would be using same epoll_fd and events array 
    } 

    void main(void) { 
     // create and bind to socket 
     // create events_fd 
     // allocate memory for events array 
     // subscribe to events EPOLLIN and EPOLLET 
     // pack the socket_fd, epoll_fd, &events, &events_array into 
     // thread_args struct. 

     // create multiple threads and pass thread_func and 
     // same thread_args to all threads 
    } 

或者是更好地做到這樣的:

void *thread_func(void *socket_fd) { 
     // create events_fd 
     // allocate memory for events array 
     // subscribe to events EPOLLIN and EPOLLET 
     // epoll_wait using own epoll_fd and events_array 
     // now all threads would have a separate epoll_fd with 
     // events populated on its own array 
    } 

    void main(void) { 
    // create and bind to socket 

    //create multiple threads and pass thread_func and socket_fd to 
    // all threads 
    } 

有如何做到這在C一個很好的例子?我看到的示例在main()中運行事件循環,並在檢測到事件時產生一個新線程來處理請求。我想要做的是在程序開始時創建特定數量的線程,並讓每個線程運行事件循環和處理請求。

回答

13

在幾個 線程中共享相同的Epoll fd(而不是套接字fd)是否安全。

是的,它是安全的 - 在epoll(7)接口是線程安全的 - 但這樣做的時候,你就要小心了,你至少應該使用EPOLLET(邊沿觸發模式,而不是默認的電平觸發)以避免其他線程中的虛假喚醒。這是因爲當新事件可用於處理時,電平觸發模式會喚醒每個線程。由於只有一個線程會處理它,所以會不必要地喚醒大多數線程。

如果共享的epfd用於將每個線程都必須通過它自己的事件 陣列或共享的事件數組epoll_wait()

是的,你需要每個線程在一個單獨的事件數組,或否則你會有競爭條件和令人討厭的事情可能發生。例如,您可能有一個線程仍在迭代epoll_wait(2)返回的事件,並在突然另一個線程使用相同數組調用epoll_wait(2)時處理請求,然後在另一個線程正在讀取它們的同時覆蓋事件。不好!你絕對需要爲每個線程單獨的數組。

假設你對每個線程都有一個單獨的數組,可能性 - 等待相同的epoll fd或每個線程都有一個單獨的epoll fd - 將同樣適用,但請注意語義不同。使用全局共享的epoll fd,每個線程都會等待來自的任何客戶端的請求,因爲客戶端都被添加到同一個epoll fd中。對於每個線程都有一個單獨的epoll fd,那麼每個線程基本上都負責一部分客戶端(被該線程接受的那些客戶端)。

這可能與您的系統無關,也可能會產生巨大差異。例如,一個線程可能會發生不幸,導致一羣高級用戶發出沉重而頻繁的請求,導致該線程工作過度,而其他具有較不積極客戶端的線程幾乎空閒。這不公平嗎?另一方面,也許你想只有一些線程處理特定類的用戶,並且在這種情況下,在每個線程上有不同的epoll fds是有意義的。像往常一樣,你需要考慮兩種可能性,評估權衡,考慮你的具體問題,並做出決定。

下面是一個使用全局共享epoll fd的示例。我本來並不打算這麼做,但有一件事導致了另一件事,而且,這很有趣,我認爲它可以幫助你開始。它是一個在端口3000上偵聽的echo服務器,並且有一個使用epoll的20個線程池來同時接受新客戶端和服務請求。

#include <stdio.h> 
#include <stdlib.h> 
#include <inttypes.h> 
#include <errno.h> 
#include <string.h> 
#include <pthread.h> 
#include <assert.h> 
#include <unistd.h> 
#include <sys/types.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <sys/epoll.h> 

#define SERVERPORT 3000 
#define SERVERBACKLOG 10 
#define THREADSNO 20 
#define EVENTS_BUFF_SZ 256 

static int serversock; 
static int epoll_fd; 
static pthread_t threads[THREADSNO]; 

int accept_new_client(void) { 

    int clientsock; 
    struct sockaddr_in addr; 
    socklen_t addrlen = sizeof(addr); 
    if ((clientsock = accept(serversock, (struct sockaddr *) &addr, &addrlen)) < 0) { 
     return -1; 
    } 

    char ip_buff[INET_ADDRSTRLEN+1]; 
    if (inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == NULL) { 
     close(clientsock); 
     return -1; 
    } 

    printf("*** [%p] Client connected from %s:%" PRIu16 "\n", (void *) pthread_self(), 
      ip_buff, ntohs(addr.sin_port)); 

    struct epoll_event epevent; 
    epevent.events = EPOLLIN | EPOLLET; 
    epevent.data.fd = clientsock; 

    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientsock, &epevent) < 0) { 
     perror("epoll_ctl(2) failed attempting to add new client"); 
     close(clientsock); 
     return -1; 
    } 

    return 0; 
} 

int handle_request(int clientfd) { 
    char readbuff[512]; 
    struct sockaddr_in addr; 
    socklen_t addrlen = sizeof(addr); 
    ssize_t n; 

    if ((n = recv(clientfd, readbuff, sizeof(readbuff)-1, 0)) < 0) { 
     return -1; 
    } 

    if (n == 0) { 
     return 0; 
    } 

    readbuff[n] = '\0'; 

    if (getpeername(clientfd, (struct sockaddr *) &addr, &addrlen) < 0) { 
     return -1; 
    } 

    char ip_buff[INET_ADDRSTRLEN+1]; 
    if (inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == NULL) { 
     return -1; 
    } 

    printf("*** [%p] [%s:%" PRIu16 "] -> server: %s", (void *) pthread_self(), 
      ip_buff, ntohs(addr.sin_port), readbuff); 

    ssize_t sent; 
    if ((sent = send(clientfd, readbuff, n, 0)) < 0) { 
     return -1; 
    } 

    readbuff[sent] = '\0'; 

    printf("*** [%p] server -> [%s:%" PRIu16 "]: %s", (void *) pthread_self(), 
      ip_buff, ntohs(addr.sin_port), readbuff); 

    return 0; 
} 

void *worker_thr(void *args) { 
    struct epoll_event *events = malloc(sizeof(*events)*EVENTS_BUFF_SZ); 
    if (events == NULL) { 
     perror("malloc(3) failed when attempting to allocate events buffer"); 
     pthread_exit(NULL); 
    } 

    int events_cnt; 
    while ((events_cnt = epoll_wait(epoll_fd, events, EVENTS_BUFF_SZ, -1)) > 0) { 
     int i; 
     for (i = 0; i < events_cnt; i++) { 
      assert(events[i].events & EPOLLIN); 

      if (events[i].data.fd == serversock) { 
       if (accept_new_client() == -1) { 
        fprintf(stderr, "Error accepting new client: %s\n", 
         strerror(errno)); 
       } 
      } else { 
       if (handle_request(events[i].data.fd) == -1) { 
        fprintf(stderr, "Error handling request: %s\n", 
         strerror(errno)); 
       } 
      } 
     } 
    } 

    if (events_cnt == 0) { 
     fprintf(stderr, "epoll_wait(2) returned 0, but timeout was not specified...?"); 
    } else { 
     perror("epoll_wait(2) error"); 
    } 

    free(events); 

    return NULL; 
} 

int main(void) { 
    if ((serversock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { 
     perror("socket(2) failed"); 
     exit(EXIT_FAILURE); 
    } 

    struct sockaddr_in serveraddr; 
    serveraddr.sin_family = AF_INET; 
    serveraddr.sin_port = htons(SERVERPORT); 
    serveraddr.sin_addr.s_addr = INADDR_ANY; 

    if (bind(serversock, (const struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0) { 
     perror("bind(2) failed"); 
     exit(EXIT_FAILURE); 
    } 

    if (listen(serversock, SERVERBACKLOG) < 0) { 
     perror("listen(2) failed"); 
     exit(EXIT_FAILURE); 
    } 

    if ((epoll_fd = epoll_create(1)) < 0) { 
     perror("epoll_create(2) failed"); 
     exit(EXIT_FAILURE); 
    } 

    struct epoll_event epevent; 
    epevent.events = EPOLLIN | EPOLLET; 
    epevent.data.fd = serversock; 

    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, serversock, &epevent) < 0) { 
     perror("epoll_ctl(2) failed on main server socket"); 
     exit(EXIT_FAILURE); 
    } 

    int i; 
    for (i = 0; i < THREADSNO; i++) { 
     if (pthread_create(&threads[i], NULL, worker_thr, NULL) < 0) { 
      perror("pthread_create(3) failed"); 
      exit(EXIT_FAILURE); 
     } 
    } 

    /* main thread also contributes as worker thread */ 
    worker_thr(NULL); 

    return 0; 
} 

有兩點要注意:

  • main()應該返回int,不void(如你在你的例子顯示)
  • 始終以錯誤返回代碼處理。忽視它們是非常普遍的,當事情破裂時很難知道發生了什麼。
  • 該代碼假定沒有請求大於511個字節(如handle_request()中的緩衝區大小所示)。如果請求大於此值,則有可能某些數據長時間留在套接字中,因爲epoll_wait(2)只有在該文件描述符上發生新事件(因爲我們使用的是EPOLLET)纔會報告它。在最糟糕的情況下,客戶可能永遠不會發送任何新數據,並永遠等待回覆。
  • 打印每個請求的線程標識符的代碼假定pthread_t是不透明的指針類型。實際上,pthread_t是Linux中的指針類型,但它可能是其他平臺中的整數類型,因此這不是可移植的。然而,這可能不是什麼大問題,因爲epoll是Linux專用的,所以代碼無論如何都是不可移植的。
  • 它假定當線程仍在提供來自該客戶端的請求時,不會有來自同一客戶端的其他請求到達。如果新的請求在此期間到達並且另一個線程開始提供服務,我們有競爭條件,並且客戶端不一定會按照發送給他們的相同順序接收到回顯消息(但是,write(2)是原子的,所以雖然答覆可能是沒有秩序,他們不會穿插)。
+0

感謝您的全面回答。這非常有幫助。 – MiJo

+0

@MiJo很高興我能幫到你。這是一個很好的問題:) –

相關問題