這裏有一個簡單的方法大綱,使用事件,而不是信號燈:
DWORD increment_offset(DWORD offset)
{
offset++;
if (offset == QUEUE_LENGTH*2) offset = 0;
return offset;
}
void consumer(void)
{
for (;;)
{
DWORD current_write_offset = InterlockedCompareExchange(write_offset, 0, 0);
if ((current_write_offset != *read_offset + QUEUE_LENGTH) &&
(current_write_offset + QUEUE_LENGTH != *read_offset))
{
// Queue is not full, make sure producer is awake
SetEvent(signal_producer_event);
}
if (*read_offset == current_write_offset)
{
// Queue is empty, wait for producer to add a message
WaitForSingleObject(signal_consumer_event, INFINITE);
continue;
}
MemoryBarrier();
_ReadWriteBarrier;
consume((*read_offset) % QUEUE_LENGTH);
InterlockedExchange(read_offset, increment_offset(*read_offset));
}
}
void producer(void)
{
for (;;)
{
DWORD current_read_offset = InterlockedCompareExchange(read_offset, 0, 0);
if (current_read_offset != *write_offset)
{
// Queue is not empty, make sure consumer is awake
SetEvent(signal_consumer_event);
}
if ((*write_offset == current_read_offset + QUEUE_LENGTH) ||
(*write_offset + QUEUE_LENGTH == current_read_offset))
{
// Queue is full, wait for consumer to remove a message
WaitForSingleObject(signal_producer_event, INFINITE);
continue;
}
produce((*write_offset) % QUEUE_LENGTH);
MemoryBarrier();
_ReadWriteBarrier;
InterlockedExchange(write_offset, increment_offset(*write_offset));
}
}
注:
代碼張貼編譯(給予適當的聲明),但我有沒有其他測試它。
read_offset
是共享內存中指向DWORD
的指針,指示應從下一個讀取哪個插槽。類似地,write_offset
指向共享存儲器中的DWORD
,指示下一個應寫入哪個插槽。
QUEUE_LENGTH + x
的偏移量指的是與x
的偏移量相同的時隙,以消除完整隊列和空隊列之間的歧義。這就是爲什麼increment_offset()
函數檢查QUEUE_LENGTH*2
而不僅僅是QUEUE_LENGTH
,以及爲什麼我們調用consume()
和produce()
函數時取模。 (這種方法的一種替代方法是修改生產者從不使用最後一個可用插槽,但這會浪費一個插槽。)
signal_consumer_event
和signal_producer_event
必須是自動重置事件。請注意,設置已設置的事件是無操作的。
如果隊列實際上是空的,消費者僅等待其事件,並且如果隊列實際上已滿,則生產者僅等待其事件。
當任一進程被喚醒時,它必須重新檢查隊列的狀態,因爲存在可能導致虛假喚醒的爭用條件。
因爲我使用互鎖操作,並且因爲一次只有一個進程正在使用任何特定的插槽,所以不需要互斥鎖。我已經包含了內存障礙,以確保生產者寫入插槽的更改將被消費者看到。如果您不熟悉無鎖代碼,您會發現將所顯示的算法轉換爲使用互斥鎖並不重要。
請注意,InterlockedCompareExchange(pointer, 0, 0);
看起來有點複雜,但它只是一個線程安全等價於*pointer
,即它讀取指針處的值。同樣,InterlockedExchange(pointer, value);
與*pointer = value;
相同,但是線程安全。根據編譯器和目標體系結構的不同,互鎖操作可能不是絕對必要的,但性能影響可以忽略不計,因此我建議您進行防禦性編程。
考慮消費者在致電consume()
函數期間(或之前)崩潰的情況。當消費者重新啓動時,它會再次選取相同的消息並按正常方式處理。就製作人而言,沒有什麼不尋常的事情發生,除了消息比平時處理時間更長。如果生產者在創建消息時崩潰,則會出現類似的情況;重新啓動時,生成的第一條消息將覆蓋不完整的消息,並且消費者不會受到影響。顯然,如果崩潰發生在調用InterlockedExchange
之後,但在生產者或消費者調用SetEvent
之前發生,並且如果隊列先前是空的或已滿的,那麼另一個進程將不會在那個時候被喚醒點。但是,一旦崩潰的進程重新啓動,它就會被喚醒。您不能在隊列中丟失插槽,並且進程無法死鎖。
我認爲簡單的多生產單消費的情況會是這個樣子:
void producer(void)
{
for (;;)
{
DWORD current_read_offset = InterlockedCompareExchange(read_offset, 0, 0);
if (current_read_offset != *write_offset)
{
// Queue is not empty, make sure consumer is awake
SetEvent(signal_consumer_event);
}
produce_in_local_cache();
claim_mutex();
// read offset may have changed, re-read it
current_read_offset = InterlockedCompareExchange(read_offset, 0, 0);
if ((*write_offset == current_read_offset + QUEUE_LENGTH) ||
(*write_offset + QUEUE_LENGTH == current_read_offset))
{
// Queue is full, wait for consumer to remove a message
WaitForSingleObject(signal_producer_event, INFINITE);
continue;
}
copy_from_local_cache_to_shared_memory((*write_offset) % QUEUE_LENGTH);
MemoryBarrier();
_ReadWriteBarrier;
InterlockedExchange(write_offset, increment_offset(*write_offset));
release_mutex();
}
}
如果活動製片人崩潰,如廢棄的互斥體將被檢測;你可以像對待這個互斥體一樣正確地釋放這個情況。如果崩潰的進程增加了寫入偏移量,那麼它添加的條目將照常進行處理;如果不是,它會被任何一個製片人聲稱互斥體覆蓋。在任何情況下都不需要任何特殊行動。
你最好跟蹤共享內存塊中隊列的狀態,並使用事件而不是信號量。我無法給出更具體的建議;您對現有算法的描述過於模糊。 (隊列是一個環形緩衝區,或FIFO,或者什麼?當消費者在崩潰後重新啓動時,它如何知道首先要讀取哪個插槽?) –
在共享內存中管理的數據結構的細節在這裏並不重要。對於這個討論,假定一個帶有兩個偏移量的環形緩衝區,一個用於讀取一個指向恆定大小的槽的寫入。生產者僅使用和更改寫入偏移量和僅使用者使用者,並更改讀取偏移量。由於偏移量存儲在共享內存中,重新啓動的消費者簡單在崩潰之前讀取偏移量的位置處拾取。由於消費者沒有釋放寫信號量,因此沒有生產者超支的風險。 –
由於隊列的狀態部分保持在信號量的狀態,因此您提議跟蹤共享內存中的隊列狀態不起作用。如果消費者在完整的信號量下等待並且沒有在發生崩潰之前發出空信號量的信號,則完整項目的計數減少,但完整信號量的計數沒有增加。 不會發生數據溢出,但生產者現在對隊列中元素的總數有不正確的看法。 –