2015-07-21 52 views
2

this question我知道我可以撥打epoll_ctl(2)而另一個線程阻止epoll_wait(2)。我仍然有一個問題。正在爲epoll線程重新安裝文件描述符嗎?

當使用epollEPOLLONESHOT標誌時,只有一個事件被觸發,並且必須使用epoll_ctl(2)來重新組合fd。這是必要的,所以只有一個線程 將從fd中讀取並適當地處理結果。

以下是有點可視化我認爲問題的一個時間表:

Thread1:      Thread2:     Kernel: 
----------------------------------------------------------------------- 
epoll_wait(); 
                 Receives chunk 
dispatch chunk to thread 2 
epoll_wait();     Handle chunk 
           Still handle chunk  Receives chunk 
           Rearm fd for epoll 
? 

就當fd被一大塊後重新武裝的問號會發生什麼收到? epoll會觸發一個EPOLLIN事件,還是會無限期地阻止該套接字是否可讀?我的架構是否合理?

回答

2

您的架構是明智的,它會工作:epoll將文件描述符標記爲可讀並觸發EPOLLIN事件。

關於這方面的文檔是稀缺和微妙的;在Q/man 7 epoll簡要的部分提到這一點:

Q8是否對文件描述符的操作影響已收集 但尚未報告的事件?

A8您可以在 現有文件描述符上執行兩個操作。刪除對於這種情況將毫無意義。 修改將重讀可用的I/O。

兩個操作,您可以在現有的文件描述符做(現有的文件描述符已被添加到在過去設定的epoll的文件描述符 - 這包括正在等待重新裝備文件描述符)被刪除和修改。正如手冊頁提到的,刪除在這裏沒有意義,修改將重新評估文件描述符中的條件。

儘管如此,沒有什麼比現實世界的實驗更勝一籌。下面的程序測試該邊緣的情況下:

#include <stdio.h> 
#include <pthread.h> 
#include <signal.h> 
#include <stdlib.h> 
#include <assert.h> 
#include <semaphore.h> 
#include <sys/epoll.h> 
#include <unistd.h> 

static pthread_t tids[2]; 
static int epoll_fd; 
static char input_buff[512]; 
static sem_t chunks_sem; 

void *dispatcher(void *arg) { 
    struct epoll_event epevent; 

    while (1) { 
     printf("Dispatcher waiting for more chunks\n"); 
     if (epoll_wait(epoll_fd, &epevent, 1, -1) < 0) { 
      perror("epoll_wait(2) error"); 
      exit(EXIT_FAILURE); 
     } 

     ssize_t n; 
     if ((n = read(STDIN_FILENO, input_buff, sizeof(input_buff)-1)) <= 0) { 
      if (n < 0) 
       perror("read(2) error"); 
      else 
       fprintf(stderr, "stdin closed prematurely\n"); 
      exit(EXIT_FAILURE); 
     } 

     input_buff[n] = '\0'; 
     sem_post(&chunks_sem); 
    } 

    return NULL; 
} 

void *consumer(void *arg) { 
    sigset_t smask; 
    sigemptyset(&smask); 
    sigaddset(&smask, SIGUSR1); 

    while (1) { 
     sem_wait(&chunks_sem); 
     printf("Consumer received chunk: %s", input_buff); 
     /* Simulate some processing... */ 
     sleep(2); 
     printf("Consumer finished processing chunk.\n"); 
     printf("Please send SIGUSR1 after sending more data to stdin\n"); 

     int signo; 
     if (sigwait(&smask, &signo) < 0) { 
      perror("sigwait(3) error"); 
      exit(EXIT_FAILURE); 
     } 

     assert(signo == SIGUSR1); 

     struct epoll_event epevent; 
     epevent.events = EPOLLIN | EPOLLONESHOT; 
     epevent.data.fd = STDIN_FILENO; 

     if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, STDIN_FILENO, &epevent) < 0) { 
      perror("epoll_ctl(2) error when attempting to readd stdin"); 
      exit(EXIT_FAILURE); 
     } 

     printf("Readded stdin to epoll fd\n"); 
    } 
} 

int main(void) { 

    sigset_t sigmask; 
    sigfillset(&sigmask); 
    if (pthread_sigmask(SIG_SETMASK, &sigmask, NULL) < 0) { 
     perror("pthread_sigmask(3) error"); 
     exit(EXIT_FAILURE); 
    } 

    if ((epoll_fd = epoll_create(1)) < 0) { 
     perror("epoll_create(2) error"); 
     exit(EXIT_FAILURE); 
    } 

    struct epoll_event epevent; 
    epevent.events = EPOLLIN | EPOLLONESHOT; 
    epevent.data.fd = STDIN_FILENO; 

    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, STDIN_FILENO, &epevent) < 0) { 
     perror("epoll_ctl(2) error"); 
     exit(EXIT_FAILURE); 
    } 

    if (sem_init(&chunks_sem, 0, 0) < 0) { 
     perror("sem_init(3) error"); 
     exit(EXIT_FAILURE); 
    } 

    if (pthread_create(&tids[0], NULL, dispatcher, NULL) < 0) { 
     perror("pthread_create(3) error on dispatcher"); 
     exit(EXIT_FAILURE); 
    } 

    if (pthread_create(&tids[1], NULL, consumer, NULL) < 0) { 
     perror("pthread_create(3) error on consumer"); 
     exit(EXIT_FAILURE); 
    } 

    size_t i; 
    for (i = 0; i < sizeof(tids)/sizeof(tids[0]); i++) { 
     if (pthread_join(tids[i], NULL) < 0) { 
      perror("pthread_join(3) error"); 
      exit(EXIT_FAILURE); 
     } 
    } 

    return 0; 
} 

它的工作原理如下:在調度程序線程添加stdin到epoll的集合,然後使用epoll_wait(2)每當它變爲可讀從stdin取輸入。當輸入到達時,調度員喚醒工作線程,他打印輸入並通過睡眠2秒來模擬一些處理時間。同時,調度程序返回到主循環並再次在epoll_wait(2)中阻止。

工作者線程將不會重新編譯stdin,直到您通過發送它SIGUSR1來告訴它。所以,我們只是寫更多的東西到stdin,然後發送SIGUSR1的過程。工作線程收到信號,然後才重新加載stdin - 這個時間已經可讀,調度程序已經在等待epoll_wait(2)

您可以從輸出中看到,調度員正確喚醒,一切工作就像一個魅力:

Dispatcher waiting for more chunks 
testing 1 2 3 // Input 
Dispatcher waiting for more chunks // Dispatcher notified worker and is waiting again 
Consumer received chunk: testing 1 2 3 
Consumer finished processing chunk. 
Please send SIGUSR1 after sending more data to stdin 
hello world // Input 
Readded stdin to epoll fd // Rearm stdin; dispatcher is already waiting 
Dispatcher waiting for more chunks // Dispatcher saw new input and is now waiting again 
Consumer received chunk: hello world 
Consumer finished processing chunk. 
Please send SIGUSR1 after sending more data to stdin