2013-07-08 23 views
1

Linux select()調用中繼事件排序有什麼方法嗎?Linux select()和多個套接字的FIFO排序?

我所看到的描述:

在一臺機器,我寫了一個簡單的程序,發送三個多播包,一個給每個三種不同的組播組。這些數據包是背對背發送的,兩者之間沒有任何延遲。即SENDTO(mcast_group1); SENDTO(mcast_group2); SENDTO(mcast_group3)。

在另一臺機器上,我有一個接收程序。該程序每個多播組使用一個套接字。每個套接字對其偵聽的地址執行bind()和IP_ADD_MEMBERSHIP(即加入/訂閱)。程序然後在三個套接字上執行select()。

當選擇返回時,所有三個套接字都可供讀取。但是哪一個最先?準備閱讀的套接字列表是一個集合,因此沒有順序。我想要的是,如果select()每個接收到的數據包只返回一次,按順序(這裏增加的開銷是可以接受的)。或者,還有其他一些機制可以用來確定數據包接收順序嗎?

其他信息:

  • OS是CentOS 5的(有效的紅帽企業版Linux)在x86_64
  • NIC硬件是英特爾82571EB
  • 我已經試過E1000E驅動程序版本1.3.10-k2和2.1.4-NAPI
  • 我試圖牽制網卡的中斷空載和孤立的CPU核心
  • 我已經禁用硬件IRQ通過設置驅動器選項將InterruptThrottleRate = 0合併,並設置婷RX-微秒(usecs)= 0的ethtool通過
  • 我也嘗試過使用epoll的,並且它具有相同的行爲

最後一句話:如果我只使用一個插座的分組排序被保留。在這種情況下,我將綁定到INADDR_ANY(0.0.0.0)並在同一個套接字上多次執行IP_ADD_MEMBERSHIP。但是這對我們的應用程序不起作用,因爲我們需要通過綁定到實際的多播地址來提供過濾。最終,在同一臺機器上將會有多個多播接收程序,並且訂閱集可能會相互交叉。所以也許另一種解決方案是找到另一種方法來實現bind()的過濾效果,但不使用bind()。

回答

0

如果select()返回大於1,那麼事件必須非常接近才能使排序問題變得毫無意義。

1

您可以使用IP_PKTINFO將數據包被髮送到組播組的地址 - 即使套接字認購一堆組播組。完成此操作後,您將按順序獲取數據包並按組地址進行過濾。看下面的例子:

#include <stdio.h> 
#include <stdlib.h> 
#include <string.h> 
#include <unistd.h> 
#include <sys/types.h> 
#include <sys/socket.h> 
#include <netinet/in.h> 
#include <arpa/inet.h> 
#include <sys/stat.h> 
#include <ctype.h> 
#include <errno.h> 

#define PORT 1234 
#define PPANIC(msg) perror(msg); exit(1); 
#define STATS_PATCH 0 

int main(int argc, char **argv) 
{ 
    fd_set master; 
    fd_set read_fds; 
    struct sockaddr_in serveraddr; 
    int sock; 
    int opt = 1; 
    size_t i; 
    int rc; 

    char *mcast_groups[] = { 
     "226.0.0.1", 
     "226.0.0.2", 
     NULL 
    }; 
#if STATS_PATCH 
    struct stat stat_buf; 
#endif 

    struct ip_mreq imreq; 

    FD_ZERO(&master); 
    FD_ZERO(&read_fds); 

    rc = sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); 
    if(rc == -1) 
    { 
     PPANIC("socket() failed"); 
    } 

    rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); 
    if(rc == -1) 
    { 
     PPANIC("setsockopt(reuse) failed"); 
    } 

    memset(&serveraddr, 0, sizeof(serveraddr)); 
    serveraddr.sin_family = AF_INET; 
    serveraddr.sin_port = htons(PORT); 
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); 

    rc = bind(sock, (struct sockaddr *)&serveraddr, sizeof(serveraddr)); 
    if(rc == -1) 
    { 
     PPANIC("bind() failed"); 
    } 

    rc = setsockopt(sock, IPPROTO_IP, IP_PKTINFO, &opt, sizeof(opt)); 
    if(rc == -1) 
    { 
     PPANIC("setsockopt(IP_PKTINFO) failed"); 
    } 

    for (i = 0; mcast_groups[i] != NULL; i++) 
    { 
     imreq.imr_multiaddr.s_addr = inet_addr(mcast_groups[i]); 
     imreq.imr_interface.s_addr = INADDR_ANY; 
     rc = setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&imreq, sizeof(struct ip_mreq)); 
     if (rc != 0) 
     { 
      PPANIC("joing mcast group failed"); 
     } 
    } 

    FD_SET(sock, &master); 

    while(1) 
    { 
     read_fds = master; 
     rc = select(sock + 1, &read_fds, NULL, NULL, NULL); 

     if (rc == 0) 
     { 
      continue; 
     } 

     if(rc == -1) 
     { 
      PPANIC("select() failed"); 
     } 

     if(FD_ISSET(sock, &read_fds)) 
     { 
      char buf[1024]; 
      int inb; 
      char ctrl_msg_buf[1024]; 
      struct iovec iov[1]; 
      iov[0].iov_base = buf; 
      iov[0].iov_len = 1024; 
      struct msghdr msg_hdr = { 
       .msg_iov = iov, 
       .msg_iovlen = 1, 
       .msg_name = NULL, 
       .msg_namelen = 0, 
       .msg_control = ctrl_msg_buf, 
       .msg_controllen = sizeof(ctrl_msg_buf), 
      }; 
      struct cmsghdr *ctrl_msg_hdr; 

      inb = recvmsg(sock, &msg_hdr, 0); 
      if (inb < 0) 
      { 
       PPANIC("recvmsg() failed"); 
      } 

      for (ctrl_msg_hdr = CMSG_FIRSTHDR(&msg_hdr); ctrl_msg_hdr != NULL; ctrl_msg_hdr = CMSG_NXTHDR(&msg_hdr, ctrl_msg_hdr)) 
      { 
       if (ctrl_msg_hdr->cmsg_level == IPPROTO_IP && ctrl_msg_hdr->cmsg_type == IP_PKTINFO) 
       { 
        struct in_pktinfo *pckt_info = (struct in_pktinfo *)CMSG_DATA(ctrl_msg_hdr); 
        printf("got data for mcast group: %s\n", inet_ntoa(pckt_info->ipi_addr)); 
        break; 
       } 
      } 

      printf("|"); 
      for (i = 0; i < inb; i++) 
       printf("%c", isprint(buf[i])?buf[i]:'?'); 
      printf("|\n"); 
#if STATS_PATCH 
      rc = fstat(sock, &stat_buf); 
      if (rc == -1) 
      { 
       perror("fstat() failed"); 
      } else { 
       printf("st_atime: %d\n", stat_buf.st_atime); 
       printf("st_mtime: %d\n", stat_buf.st_mtime); 
       printf("st_ctime: %d\n", stat_buf.st_ctime); 
      } 
#endif 
     } 
    } 

    return 0; 
} 

下面的代碼不會解決有機磷農藥的問題,但可以指導人們處理類似要求

(EDIT)一個人不應該在深夜做這樣的事情。 ..即使採用這種解決方案,您只會得到fd由select處理的訂單 - 這將不會提示幀到達的時間。

正如here所述,由於沒有爲socket inode設置所需的回調,因此目前無法檢索套接字的順序或更改的時間戳。但是如果你能修補你的內核,你可以通過在選擇系統調用中設置時間來解決這個問題。

下面的補丁可以給你一個想法:

diff --git a/fs/select.c b/fs/select.c 
index 467bb1c..3f2927e 100644 
--- a/fs/select.c 
+++ b/fs/select.c 
@@ -435,6 +435,9 @@ int do_select(int n, fd_set_bits *fds, struct timespec *end_time) 
     for (i = 0; i < n; ++rinp, ++routp, ++rexp) { 
      unsigned long in, out, ex, all_bits, bit = 1, mask, j; 
      unsigned long res_in = 0, res_out = 0, res_ex = 0; 
+   struct timeval tv; 
+   
+   do_gettimeofday(&tv); 

      in = *inp++; out = *outp++; ex = *exp++; 
      all_bits = in | out | ex; 
@@ -452,6 +455,16 @@ int do_select(int n, fd_set_bits *fds, struct timespec *end_time) 
       f = fdget(i); 
       if (f.file) { 
        const struct file_operations *f_op; 
+     struct kstat stat; 
+     
+     int ret; 
+     u8 is_sock = 0; 
+ 
+     ret = vfs_getattr(&f.file->f_path, &stat); 
+     if(ret == 0 && S_ISSOCK(stat.mode)) { 
+      is_sock = 1; 
+     } 
+     
        f_op = f.file->f_op; 
        mask = DEFAULT_POLLMASK; 
        if (f_op->poll) { 
@@ -464,16 +477,22 @@ int do_select(int n, fd_set_bits *fds, struct timespec *end_time) 
         res_in |= bit; 
         retval++; 
         wait->_qproc = NULL; 
+      if(is_sock && f.file->f_inode) 
+       f.file->f_inode->i_ctime.tv_sec = tv.tv_sec; 
        } 
        if ((mask & POLLOUT_SET) && (out & bit)) { 
         res_out |= bit; 
         retval++; 
         wait->_qproc = NULL; 
+      if(is_sock && f.file->f_inode) 
+       f.file->f_inode->i_ctime.tv_sec = tv.tv_sec; 
        } 
        if ((mask & POLLEX_SET) && (ex & bit)) { 
         res_ex |= bit; 
         retval++; 
         wait->_qproc = NULL; 
+      if(is_sock && f.file->f_inode) 
+       f.file->f_inode->i_ctime.tv_sec = tv.tv_sec; 
        } 
        /* got something, stop busy polling */ 
        if (retval) { 

注:

  1. 這是......只爲你:) - 不要指望它在主線

  2. do_gettimeofday()被稱爲之前每個相關的fd被測試。 以獲得更高的粒度,這應該在每次迭代中完成(並且僅在需要時)。由於stat接口只提供了一秒的粒度,所以你可以(!!很好!)使用剩餘的時間屬性將秒的小數部分映射到這些字段。

  3. 這是使用內核3.16.0完成的,沒有很好的測試。不要在太空船或醫療設備中使用它。如果您想嘗試一下,得到一個文件系統映像(如https://people.debian.org/~aurel32/qemu/amd64/debian_wheezy_amd64_standard.qcow2)和QEMU使用來測試它:

    須藤QEMU系統-x86_64的-kernel弓/ 86 /啓動/ bzImage的-hda debian_wheezy_amd64_standard.qcow2 - 追加「root =/dev/sda1」