2014-12-18 79 views
1

我有一個使用共享FIFO的實時應用程序。有幾個作家流程和一個讀者流程。數據被定期寫入FIFO並不斷消耗。從理論上講,FIFO不應該溢出,因爲讀取速度比所有寫入器的組合速度要快。但是,FIFO確實溢出。在Linux上喚醒sem_wait()失敗

我試圖重現該問題終於摸索出以下(簡化)代碼:

#include <stdint.h> 
#include <cstdio> 
#include <cstdlib> 
#include <cstring> 
#include <cassert> 
#include <pthread.h> 
#include <semaphore.h> 
#include <sys/time.h> 
#include <unistd.h> 


class Fifo 
{ 
public: 
    Fifo() : _deq(0), _wptr(0), _rptr(0), _lock(0) 
    { 
     memset(_data, 0, sizeof(_data)); 
     sem_init(&_data_avail, 1, 0); 
    } 

    ~Fifo() 
    { 
     sem_destroy(&_data_avail); 
    } 

    void Enqueue() 
    { 
     struct timeval tv; 
     gettimeofday(&tv, NULL); 
     uint64_t enq = tv.tv_usec + tv.tv_sec * 1000000; 
     while (__sync_lock_test_and_set(&_lock, 1)) 
      sched_yield(); 
     uint8_t wptr = _wptr; 
     uint8_t next_wptr = (wptr + 1) % c_entries; 
     int retry = 0; 
     while (next_wptr == _rptr)  // will become full 
     { 
      printf("retry=%u enq=%lu deq=%lu count=%d\n", retry, enq, _deq, Count()); 
      for (uint8_t i = _rptr; i != _wptr; i = (i+1)%c_entries) 
       printf("%u: %lu\n", i, _data[i]); 
      assert(retry++ < 2); 
      usleep(500); 
     } 
     assert(__sync_bool_compare_and_swap(&_wptr, wptr, next_wptr)); 
     _data[wptr] = enq; 
     __sync_lock_release(&_lock); 
     sem_post(&_data_avail); 
    } 

    int Dequeue() 
    { 
     struct timeval tv; 
     gettimeofday(&tv, NULL); 
     uint64_t deq = tv.tv_usec + tv.tv_sec * 1000000; 
     _deq = deq; 
     uint8_t rptr = _rptr, wptr = _wptr; 
     uint8_t next_rptr = (rptr + 1) % c_entries; 
     bool empty = Count() == 0; 
     assert(!sem_wait(&_data_avail));// bug in sem_wait? 
     _deq = 0; 
     uint64_t enq = _data[rptr];  // enqueue time 
     assert(__sync_bool_compare_and_swap(&_rptr, rptr, next_rptr)); 
     int latency = deq - enq;  // latency from enqueue to dequeue 
     if (empty && latency < -500) 
     { 
      printf("before dequeue: w=%u r=%u; after dequeue: w=%u r=%u; %d\n", wptr, rptr, _wptr, _rptr, latency); 
     } 
     return latency; 
    } 

    int Count() 
    { 
     int count = 0; 
     assert(!sem_getvalue(&_data_avail, &count)); 
     return count; 
    } 

    static const unsigned c_entries = 16; 

private: 
    sem_t _data_avail; 
    uint64_t _data[c_entries]; 
    volatile uint64_t _deq;  // non-0 indicates when dequeue happened 
    volatile uint8_t _wptr, _rptr;  // write, read pointers 
    volatile uint8_t _lock;  // write lock 
}; 


static const unsigned c_total = 10000000; 
static const unsigned c_writers = 3; 

static Fifo s_fifo; 


// writer thread 
void* Writer(void* arg) 
{ 
    for (unsigned i = 0; i < c_total; i++) 
    { 
     int t = rand() % 200 + 200;  // [200, 399] 
     usleep(t); 
     s_fifo.Enqueue(); 
    } 
    return NULL; 
} 

int main() 
{ 
    pthread_t thread[c_writers]; 
    for (unsigned i = 0; i < c_writers; i++) 
     pthread_create(&thread[i], NULL, Writer, NULL); 

    for (unsigned total = 0; total < c_total*c_writers; total++) 
     s_fifo.Dequeue(); 
} 

當排隊()溢出,調試打印指示出列()被卡住(因爲_deq不爲0 )。 Dequeue()可以卡住的唯一地方是sem_wait()。但是,由於fifo已滿(也由sem_getvalue()確認),我不明白這是如何發生的。即使經過多次重試(每次等待500us),即使在Enqueue()完全停止(繁忙重試)時Dequeue()肯定應該排空,FIFO仍然是滿的。

在代碼示例中,有3個編寫者,每個編寫者每個編寫200-400us。在我的電腦上(運行centOS 6.5 kernel 2.6.32-279.22.1.el6.x86_64的8核i7-2860,g ++ 4.47 20120313),代碼會在幾分鐘內失敗。我也嘗試了其他幾個centOS系統,並且也以同樣的方式失敗。我知道使fifo更大可以減少溢出概率(事實上,程序仍然會失敗,c_entries = 128),但是在我的實時應用程序中,enqueue-dequeue延遲存在嚴格的限制,所以數據必須是快速流失。如果它不是sem_wait()中的錯誤,那麼是什麼阻止它得到信號?

P.S.如果我更換

 assert(!sem_wait(&_data_avail));// bug in sem_wait? 

 while (sem_trywait(&_data_avail) < 0) sched_yield(); 

然後該程序運行正常。所以似乎在sem_wait()和/或調度程序中有問題。

+0

我沒有仔細看過你的代碼,但是從你的描述來看,這聽起來像你取決於你的假設,即閱讀比寫作更快。雖然這可能是事實,但您必須始終假定調度程序將選擇允許選擇的最差可能時間表。 – kec 2014-12-18 23:31:13

+0

您是否嘗試過使用'sem_init(&_ data_avail,0,0)'初始化'sem'? – agbinfo 2014-12-19 15:58:00

+0

是的,它是一樣的。 – zhao 2014-12-19 17:01:06

回答

0

您需要使用sem_wait/sem_post調用的組合才能管理您的讀取和寫入線程。

您的排隊線程只執行一個sem_post並且您的出列隊列僅執行sem_wait調用。您需要將sem_wait添加到入隊線程和出隊線程上的sem_post。

很久以前,我實現了讓多個線程/進程能夠讀取某些共享內存並且只有一個線程/進程寫入共享內存的功能。我使用了兩個信號量,一個寫信號量和一個讀信號量。讀線程會一直等到寫信號未被設置,然後它纔會設置讀信號量。寫入線程將設置寫入信號量,然後等待直到讀取信號量未設置。讀寫線程在完成任務時將取消設置的信號量。讀信號量一次可以有n個線程鎖定讀取信號量,而寫信號量可以一次由單個線程鎖定。

+0

爲什麼我必須使用兩個信號量?我不能限制寫入,因爲數據流入。如果寫入過程必須sem_wait(),那麼寫入數據將丟失,這與FIFO溢出一樣糟糕。 – zhao 2014-12-19 03:50:20

0

如果它不是sem_wait()中的錯誤,那麼是什麼阻止它得到信號量 ?

程序的急躁阻止了它。不能保證Dequeue()線程在給定的重試次數內被調度。如果更改

  assert(retry++ < 2); 

  retry++; 

你會看到該程序高高興興地繼續閱讀過程中有時經過8個甚或更重試。