我有一個使用共享FIFO的實時應用程序。有幾個作家流程和一個讀者流程。數據被定期寫入FIFO並不斷消耗。從理論上講,FIFO不應該溢出,因爲讀取速度比所有寫入器的組合速度要快。但是,FIFO確實溢出。在Linux上喚醒sem_wait()失敗
我試圖重現該問題終於摸索出以下(簡化)代碼:
#include <stdint.h>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <cassert>
#include <pthread.h>
#include <semaphore.h>
#include <sys/time.h>
#include <unistd.h>
class Fifo
{
public:
Fifo() : _deq(0), _wptr(0), _rptr(0), _lock(0)
{
memset(_data, 0, sizeof(_data));
sem_init(&_data_avail, 1, 0);
}
~Fifo()
{
sem_destroy(&_data_avail);
}
void Enqueue()
{
struct timeval tv;
gettimeofday(&tv, NULL);
uint64_t enq = tv.tv_usec + tv.tv_sec * 1000000;
while (__sync_lock_test_and_set(&_lock, 1))
sched_yield();
uint8_t wptr = _wptr;
uint8_t next_wptr = (wptr + 1) % c_entries;
int retry = 0;
while (next_wptr == _rptr) // will become full
{
printf("retry=%u enq=%lu deq=%lu count=%d\n", retry, enq, _deq, Count());
for (uint8_t i = _rptr; i != _wptr; i = (i+1)%c_entries)
printf("%u: %lu\n", i, _data[i]);
assert(retry++ < 2);
usleep(500);
}
assert(__sync_bool_compare_and_swap(&_wptr, wptr, next_wptr));
_data[wptr] = enq;
__sync_lock_release(&_lock);
sem_post(&_data_avail);
}
int Dequeue()
{
struct timeval tv;
gettimeofday(&tv, NULL);
uint64_t deq = tv.tv_usec + tv.tv_sec * 1000000;
_deq = deq;
uint8_t rptr = _rptr, wptr = _wptr;
uint8_t next_rptr = (rptr + 1) % c_entries;
bool empty = Count() == 0;
assert(!sem_wait(&_data_avail));// bug in sem_wait?
_deq = 0;
uint64_t enq = _data[rptr]; // enqueue time
assert(__sync_bool_compare_and_swap(&_rptr, rptr, next_rptr));
int latency = deq - enq; // latency from enqueue to dequeue
if (empty && latency < -500)
{
printf("before dequeue: w=%u r=%u; after dequeue: w=%u r=%u; %d\n", wptr, rptr, _wptr, _rptr, latency);
}
return latency;
}
int Count()
{
int count = 0;
assert(!sem_getvalue(&_data_avail, &count));
return count;
}
static const unsigned c_entries = 16;
private:
sem_t _data_avail;
uint64_t _data[c_entries];
volatile uint64_t _deq; // non-0 indicates when dequeue happened
volatile uint8_t _wptr, _rptr; // write, read pointers
volatile uint8_t _lock; // write lock
};
static const unsigned c_total = 10000000;
static const unsigned c_writers = 3;
static Fifo s_fifo;
// writer thread
void* Writer(void* arg)
{
for (unsigned i = 0; i < c_total; i++)
{
int t = rand() % 200 + 200; // [200, 399]
usleep(t);
s_fifo.Enqueue();
}
return NULL;
}
int main()
{
pthread_t thread[c_writers];
for (unsigned i = 0; i < c_writers; i++)
pthread_create(&thread[i], NULL, Writer, NULL);
for (unsigned total = 0; total < c_total*c_writers; total++)
s_fifo.Dequeue();
}
當排隊()溢出,調試打印指示出列()被卡住(因爲_deq不爲0 )。 Dequeue()可以卡住的唯一地方是sem_wait()。但是,由於fifo已滿(也由sem_getvalue()確認),我不明白這是如何發生的。即使經過多次重試(每次等待500us),即使在Enqueue()完全停止(繁忙重試)時Dequeue()肯定應該排空,FIFO仍然是滿的。
在代碼示例中,有3個編寫者,每個編寫者每個編寫200-400us。在我的電腦上(運行centOS 6.5 kernel 2.6.32-279.22.1.el6.x86_64的8核i7-2860,g ++ 4.47 20120313),代碼會在幾分鐘內失敗。我也嘗試了其他幾個centOS系統,並且也以同樣的方式失敗。我知道使fifo更大可以減少溢出概率(事實上,程序仍然會失敗,c_entries = 128),但是在我的實時應用程序中,enqueue-dequeue延遲存在嚴格的限制,所以數據必須是快速流失。如果它不是sem_wait()中的錯誤,那麼是什麼阻止它得到信號?
P.S.如果我更換
assert(!sem_wait(&_data_avail));// bug in sem_wait?
與
while (sem_trywait(&_data_avail) < 0) sched_yield();
然後該程序運行正常。所以似乎在sem_wait()和/或調度程序中有問題。
我沒有仔細看過你的代碼,但是從你的描述來看,這聽起來像你取決於你的假設,即閱讀比寫作更快。雖然這可能是事實,但您必須始終假定調度程序將選擇允許選擇的最差可能時間表。 – kec 2014-12-18 23:31:13
您是否嘗試過使用'sem_init(&_ data_avail,0,0)'初始化'sem'? – agbinfo 2014-12-19 15:58:00
是的,它是一樣的。 – zhao 2014-12-19 17:01:06