在幾個 線程中共享相同的Epoll fd(而不是套接字fd)是否安全。
是的,它是安全的 - 在epoll(7)
接口是線程安全的 - 但這樣做的時候,你就要小心了,你至少應該使用EPOLLET
(邊沿觸發模式,而不是默認的電平觸發)以避免其他線程中的虛假喚醒。這是因爲當新事件可用於處理時,電平觸發模式會喚醒每個線程。由於只有一個線程會處理它,所以會不必要地喚醒大多數線程。
如果共享的epfd用於將每個線程都必須通過它自己的事件 陣列或共享的事件數組epoll_wait()
是的,你需要每個線程在一個單獨的事件數組,或否則你會有競爭條件和令人討厭的事情可能發生。例如,您可能有一個線程仍在迭代epoll_wait(2)
返回的事件,並在突然另一個線程使用相同數組調用epoll_wait(2)
時處理請求,然後在另一個線程正在讀取它們的同時覆蓋事件。不好!你絕對需要爲每個線程單獨的數組。
假設你對每個線程都有一個單獨的數組,可能性 - 等待相同的epoll fd或每個線程都有一個單獨的epoll fd - 將同樣適用,但請注意語義不同。使用全局共享的epoll fd,每個線程都會等待來自的任何客戶端的請求,因爲客戶端都被添加到同一個epoll fd中。對於每個線程都有一個單獨的epoll fd,那麼每個線程基本上都負責一部分客戶端(被該線程接受的那些客戶端)。
這可能與您的系統無關,也可能會產生巨大差異。例如,一個線程可能會發生不幸,導致一羣高級用戶發出沉重而頻繁的請求,導致該線程工作過度,而其他具有較不積極客戶端的線程幾乎空閒。這不公平嗎?另一方面,也許你想只有一些線程處理特定類的用戶,並且在這種情況下,在每個線程上有不同的epoll fds是有意義的。像往常一樣,你需要考慮兩種可能性,評估權衡,考慮你的具體問題,並做出決定。
下面是一個使用全局共享epoll fd的示例。我本來並不打算這麼做,但有一件事導致了另一件事,而且,這很有趣,我認爲它可以幫助你開始。它是一個在端口3000上偵聽的echo服務器,並且有一個使用epoll的20個線程池來同時接受新客戶端和服務請求。
#include <stdio.h>
#include <stdlib.h>
#include <inttypes.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <assert.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#define SERVERPORT 3000
#define SERVERBACKLOG 10
#define THREADSNO 20
#define EVENTS_BUFF_SZ 256
static int serversock;
static int epoll_fd;
static pthread_t threads[THREADSNO];
int accept_new_client(void) {
int clientsock;
struct sockaddr_in addr;
socklen_t addrlen = sizeof(addr);
if ((clientsock = accept(serversock, (struct sockaddr *) &addr, &addrlen)) < 0) {
return -1;
}
char ip_buff[INET_ADDRSTRLEN+1];
if (inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == NULL) {
close(clientsock);
return -1;
}
printf("*** [%p] Client connected from %s:%" PRIu16 "\n", (void *) pthread_self(),
ip_buff, ntohs(addr.sin_port));
struct epoll_event epevent;
epevent.events = EPOLLIN | EPOLLET;
epevent.data.fd = clientsock;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientsock, &epevent) < 0) {
perror("epoll_ctl(2) failed attempting to add new client");
close(clientsock);
return -1;
}
return 0;
}
int handle_request(int clientfd) {
char readbuff[512];
struct sockaddr_in addr;
socklen_t addrlen = sizeof(addr);
ssize_t n;
if ((n = recv(clientfd, readbuff, sizeof(readbuff)-1, 0)) < 0) {
return -1;
}
if (n == 0) {
return 0;
}
readbuff[n] = '\0';
if (getpeername(clientfd, (struct sockaddr *) &addr, &addrlen) < 0) {
return -1;
}
char ip_buff[INET_ADDRSTRLEN+1];
if (inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == NULL) {
return -1;
}
printf("*** [%p] [%s:%" PRIu16 "] -> server: %s", (void *) pthread_self(),
ip_buff, ntohs(addr.sin_port), readbuff);
ssize_t sent;
if ((sent = send(clientfd, readbuff, n, 0)) < 0) {
return -1;
}
readbuff[sent] = '\0';
printf("*** [%p] server -> [%s:%" PRIu16 "]: %s", (void *) pthread_self(),
ip_buff, ntohs(addr.sin_port), readbuff);
return 0;
}
void *worker_thr(void *args) {
struct epoll_event *events = malloc(sizeof(*events)*EVENTS_BUFF_SZ);
if (events == NULL) {
perror("malloc(3) failed when attempting to allocate events buffer");
pthread_exit(NULL);
}
int events_cnt;
while ((events_cnt = epoll_wait(epoll_fd, events, EVENTS_BUFF_SZ, -1)) > 0) {
int i;
for (i = 0; i < events_cnt; i++) {
assert(events[i].events & EPOLLIN);
if (events[i].data.fd == serversock) {
if (accept_new_client() == -1) {
fprintf(stderr, "Error accepting new client: %s\n",
strerror(errno));
}
} else {
if (handle_request(events[i].data.fd) == -1) {
fprintf(stderr, "Error handling request: %s\n",
strerror(errno));
}
}
}
}
if (events_cnt == 0) {
fprintf(stderr, "epoll_wait(2) returned 0, but timeout was not specified...?");
} else {
perror("epoll_wait(2) error");
}
free(events);
return NULL;
}
int main(void) {
if ((serversock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
perror("socket(2) failed");
exit(EXIT_FAILURE);
}
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_port = htons(SERVERPORT);
serveraddr.sin_addr.s_addr = INADDR_ANY;
if (bind(serversock, (const struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0) {
perror("bind(2) failed");
exit(EXIT_FAILURE);
}
if (listen(serversock, SERVERBACKLOG) < 0) {
perror("listen(2) failed");
exit(EXIT_FAILURE);
}
if ((epoll_fd = epoll_create(1)) < 0) {
perror("epoll_create(2) failed");
exit(EXIT_FAILURE);
}
struct epoll_event epevent;
epevent.events = EPOLLIN | EPOLLET;
epevent.data.fd = serversock;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, serversock, &epevent) < 0) {
perror("epoll_ctl(2) failed on main server socket");
exit(EXIT_FAILURE);
}
int i;
for (i = 0; i < THREADSNO; i++) {
if (pthread_create(&threads[i], NULL, worker_thr, NULL) < 0) {
perror("pthread_create(3) failed");
exit(EXIT_FAILURE);
}
}
/* main thread also contributes as worker thread */
worker_thr(NULL);
return 0;
}
有兩點要注意:
main()
應該返回int
,不void
(如你在你的例子顯示)
- 始終以錯誤返回代碼處理。忽視它們是非常普遍的,當事情破裂時很難知道發生了什麼。
- 該代碼假定沒有請求大於511個字節(如
handle_request()
中的緩衝區大小所示)。如果請求大於此值,則有可能某些數據長時間留在套接字中,因爲epoll_wait(2)
只有在該文件描述符上發生新事件(因爲我們使用的是EPOLLET
)纔會報告它。在最糟糕的情況下,客戶可能永遠不會發送任何新數據,並永遠等待回覆。
- 打印每個請求的線程標識符的代碼假定
pthread_t
是不透明的指針類型。實際上,pthread_t
是Linux中的指針類型,但它可能是其他平臺中的整數類型,因此這不是可移植的。然而,這可能不是什麼大問題,因爲epoll是Linux專用的,所以代碼無論如何都是不可移植的。
- 它假定當線程仍在提供來自該客戶端的請求時,不會有來自同一客戶端的其他請求到達。如果新的請求在此期間到達並且另一個線程開始提供服務,我們有競爭條件,並且客戶端不一定會按照發送給他們的相同順序接收到回顯消息(但是,
write(2)
是原子的,所以雖然答覆可能是沒有秩序,他們不會穿插)。
感謝您的全面回答。這非常有幫助。 – MiJo
@MiJo很高興我能幫到你。這是一個很好的問題:) –