2013-04-29 72 views
5

最近我一直在玩IPC的共享內存。我一直試圖實現的一件事是一個簡單的環形緩衝區,其中有1個進程生產和1個進程消耗。每個進程都有自己的序列號來跟蹤其位置。這些序列號使用原子操作符更新,以確保正確的值對其他進程可見。一旦環形緩衝區滿了,生產者將會阻塞。該代碼是無鎖的,因爲沒有使用信號量或互斥鎖。共享內存中的單生產者/消費者環緩衝區

在性能方面,我讓我的相當溫和的VM每秒大約20萬條信息 - 很高興與:)

什麼我好奇我的代碼如何「正確」的。任何人都可以發現任何固有的問題/競爭條件?這是我的代碼。提前感謝您的任何意見。

#include <stdlib.h> 
#include <stdio.h> 
#include <fcntl.h> 
#include <sys/mman.h> 
#include <sys/stat.h> 
#include <time.h> 
#include <unistd.h> 
#include <string.h> 

#define SHM_ID "/mmap-test" 
#define BUFFER_SIZE 4096 
#define SLEEP_NANOS 1000 // 1 micro 

struct Message 
{ 
    long _id; 
    char _data[128]; 
}; 

struct RingBuffer 
{ 
    size_t _rseq; 
    char _pad1[64]; 

    size_t _wseq; 
    char _pad2[64]; 

    Message _buffer[BUFFER_SIZE]; 
}; 

void 
producerLoop() 
{ 
    int size = sizeof(RingBuffer); 
    int fd = shm_open(SHM_ID, O_RDWR | O_CREAT, 0600); 
    ftruncate(fd, size+1); 

    // create shared memory area 
    RingBuffer* rb = (RingBuffer*)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 
    close(fd); 

    // initialize our sequence numbers in the ring buffer 
    rb->_wseq = rb->_rseq = 0; 
    int i = 0; 

    timespec tss; 
    tss.tv_sec = 0; 
    tss.tv_nsec = SLEEP_NANOS; 

    while(1) 
    { 
     // as long as the consumer isn't running behind keep producing 
     while((rb->_wseq+1)%BUFFER_SIZE != rb->_rseq%BUFFER_SIZE) 
     { 
      // write the next entry and atomically update the write sequence number 
      Message* msg = &rb->_buffer[rb->_wseq%BUFFER_SIZE]; 
      msg->_id = i++; 
      __sync_fetch_and_add(&rb->_wseq, 1); 
     } 

     // give consumer some time to catch up 
     nanosleep(&tss, 0); 
    } 
} 

void 
consumerLoop() 
{ 
    int size = sizeof(RingBuffer); 
    int fd = shm_open(SHM_ID, O_RDWR, 0600); 
    if(fd == -1) { 
     perror("argh!!!"); return; 
    } 

    // lookup producers shared memory area 
    RingBuffer* rb = (RingBuffer*)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 

    // initialize our sequence numbers in the ring buffer 
    size_t seq = 0; 
    size_t pid = -1; 

    timespec tss; 
    tss.tv_sec = 0; 
    tss.tv_nsec = SLEEP_NANOS; 

    while(1) 
    { 
     // while there is data to consume 
     while(seq%BUFFER_SIZE != rb->_wseq%BUFFER_SIZE) 
     { 
      // get the next message and validate the id 
      // id should only ever increase by 1 
      // quit immediately if not 
      Message msg = rb->_buffer[seq%BUFFER_SIZE]; 
      if(msg._id != pid+1) { 
       printf("error: %d %d\n", msg._id, pid); return; 
      } 
      pid = msg._id; 
      ++seq; 
     } 

     // atomically update the read sequence in the ring buffer 
     // making it visible to the producer 
     __sync_lock_test_and_set(&rb->_rseq, seq); 

     // wait for more data 
     nanosleep(&tss, 0); 
    } 
} 

int 
main(int argc, char** argv) 
{ 
    if(argc != 2) { 
     printf("please supply args (producer/consumer)\n"); return -1; 
    } else if(strcmp(argv[1], "consumer") == 0) { 
     consumerLoop(); 
    } else if(strcmp(argv[1], "producer") == 0) { 
     producerLoop(); 
    } else { 
     printf("invalid arg: %s\n", argv[1]); return -1; 
    } 
} 

回答

1

乍看似乎對我很正確。我意識到你對性能感到滿意,但一個有趣的實驗可能是使用比__sync_fetch_and_add更輕量級的東西。 AFAIK它是一個完整的內存屏障,這是昂貴的。由於只有一個生產者和一個消費者,所以發佈和相應的獲取操作應該會給你更好的性能。 Facebook的Folly庫有一個生產者單消費者隊列,它使用新的C++ 11 atomics:https://github.com/facebook/folly/blob/master/folly/ProducerConsumerQueue.h

相關問題