2017-08-15 41 views
0

我在Windows上爲一個生產者和一個消費者在共享內存中實現了一個進程間消息隊列。如何保護進程間生產者消費者消息傳遞機制免受因一方崩潰而導致的損壞?

我使用一個命名信號量來算空槽,一個信號燈入選計數滿插槽和一個命名的mutex來保護共享內存中的數據結構。

考慮,例如消費者方面。生產者方面是相似的。 首先它等待對充分旗語然後(1)它需要由互斥下的隊列中的消息,然後它用信號空信號(2)

問題:

如果消費者進程在(1)和(2)之間崩潰,那麼有效的進程可以使用的隊列中的槽數減1。 假設當消費者宕機時,生產者可以處理隊列被填滿。 (它可以指定等待空信號量時的超時時間,或者甚至可以指定0表示不等待)。

當消費者重新啓動時,它可以繼續從隊列中讀取數據。數據不會被溢出,但即使它清空了所有的全部插槽後,製作者也會有一個空閒插槽可供使用。

多個這樣的重新啓動之後的隊列將沒有槽,可用於,沒有消息可以被髮送。

問:

怎麼能這種情況是可以避免或返還?

+0

你最好跟蹤共享內存塊中隊列的狀態,並使用事件而不是信號量。我無法給出更具體的建議;您對現有算法的描述過於模糊。 (隊列是一個環形緩衝區,或FIFO,或者什麼?當消費者在崩潰後重新啓動時,它如何知道首先要讀取哪個插槽?) –

+0

在共享內存中管理的數據結構的細節在這裏並不重要。對於這個討論,假定一個帶有兩個偏移量的環形緩衝區,一個用於讀取一個指向恆定大小的槽的寫入。生產者僅使用和更改寫入偏移量和僅使用者使用者,並更改讀取偏移量。由於偏移量存儲在共享內存中,重新啓動的消費者簡單在崩潰之前讀取偏移量的位置處拾取。由於消費者沒有釋放寫信號量,因此沒有生產者超支的風險。 –

+0

由於隊列的狀態部分保持在信號量的狀態,因此您提議跟蹤共享內存中的隊列狀態不起作用。如果消費者在完整的信號量下等待並且沒有在發生崩潰之前發出空信號量的信號,則完整項目的計數減少,但完整信號量的計數沒有增加。 不會發生數據溢出,但生產者現在對隊列中元素的總數有不正確的看法。 –

回答

1

這裏有一個簡單的方法大綱,使用事件,而不是信號燈:

DWORD increment_offset(DWORD offset) 
{ 
    offset++; 
    if (offset == QUEUE_LENGTH*2) offset = 0; 
    return offset; 
} 

void consumer(void) 
{ 
    for (;;) 
    { 
     DWORD current_write_offset = InterlockedCompareExchange(write_offset, 0, 0); 

     if ((current_write_offset != *read_offset + QUEUE_LENGTH) && 
      (current_write_offset + QUEUE_LENGTH != *read_offset)) 
     { 
      // Queue is not full, make sure producer is awake 
      SetEvent(signal_producer_event); 
     } 

     if (*read_offset == current_write_offset) 
     { 
      // Queue is empty, wait for producer to add a message 
      WaitForSingleObject(signal_consumer_event, INFINITE); 
      continue; 
     } 

     MemoryBarrier(); 
     _ReadWriteBarrier; 

     consume((*read_offset) % QUEUE_LENGTH); 

     InterlockedExchange(read_offset, increment_offset(*read_offset)); 
    } 
} 

void producer(void) 
{ 
    for (;;) 
    { 
     DWORD current_read_offset = InterlockedCompareExchange(read_offset, 0, 0); 

     if (current_read_offset != *write_offset) 
     { 
      // Queue is not empty, make sure consumer is awake 
      SetEvent(signal_consumer_event); 
     } 

     if ((*write_offset == current_read_offset + QUEUE_LENGTH) || 
      (*write_offset + QUEUE_LENGTH == current_read_offset)) 
     { 
      // Queue is full, wait for consumer to remove a message 
      WaitForSingleObject(signal_producer_event, INFINITE); 
      continue; 
     } 

     produce((*write_offset) % QUEUE_LENGTH); 

     MemoryBarrier(); 
     _ReadWriteBarrier; 

     InterlockedExchange(write_offset, increment_offset(*write_offset)); 
    } 
} 

注:

  • 代碼張貼編譯(給予適當的聲明),但我有沒有其他測試它。

  • read_offset是共享內存中指向DWORD的指針,指示應從下一個讀取哪個插槽。類似地,write_offset指向共享存儲器中的DWORD,指示下一個應寫入哪個插槽。

  • QUEUE_LENGTH + x的偏移量指的是與x的偏移量相同的時隙,以消除完整隊列和空隊列之間的歧義。這就是爲什麼increment_offset()函數檢查QUEUE_LENGTH*2而不僅僅是QUEUE_LENGTH,以及爲什麼我們調用consume()produce()函數時取模。 (這種方法的一種替代方法是修改生產者從不使用最後一個可用插槽,但這會浪費一個插槽。)

  • signal_consumer_eventsignal_producer_event必須是自動重置事件。請注意,設置已設置的事件是無操作的。

  • 如果隊列實際上是空的,消費者僅等待其事件,並且如果隊列實際上已滿,則生產者僅等待其事件。

  • 當任一進程被喚醒時,它必須重新檢查隊列的狀態,因爲存在可能導致虛假喚醒的爭用條件。

  • 因爲我使用互鎖操作,並且因爲一次只有一個進程正在使用任何特定的插槽,所以不需要互斥鎖。我已經包含了內存障礙,以確保生產者寫入插槽的更改將被消費者看到。如果您不熟悉無鎖代碼,您會發現將所顯示的算法轉換爲使用互斥鎖並不重要。

  • 請注意,InterlockedCompareExchange(pointer, 0, 0);看起來有點複雜,但它只是一個線程安全等價於*pointer,即它讀取指針處的值。同樣,InterlockedExchange(pointer, value);*pointer = value;相同,但是線程安全。根據編譯器和目標體系結構的不同,互鎖操作可能不是絕對必要的,但性能影響可以忽略不計,因此我建議您進行防禦性編程。

考慮消費者在致電consume()函數期間(或之前)崩潰的情況。當消費者重新啓動時,它會再次選取相同的消息並按正常方式處理。就製作人而言,沒有什麼不尋常的事情發生,除了消息比平時處理時間更長。如果生產者在創建消息時崩潰,則會出現類似的情況;重新啓動時,生成的第一條消息將覆蓋不完整的消息,並且消費者不會受到影響。顯然,如果崩潰發生在調用InterlockedExchange之後,但在生產者或消費者調用SetEvent之前發生,並且如果隊列先前是空的或已滿的,那麼另一個進程將不會在那個時候被喚醒點。但是,一旦崩潰的進程重新啓動,它就會被喚醒。您不能在隊列中丟失插槽,並且進程無法死鎖。

我認爲簡單的多生產單消費的情況會是這個樣子:

void producer(void) 
{ 
    for (;;) 
    { 
     DWORD current_read_offset = InterlockedCompareExchange(read_offset, 0, 0); 

     if (current_read_offset != *write_offset) 
     { 
      // Queue is not empty, make sure consumer is awake 
      SetEvent(signal_consumer_event); 
     } 

     produce_in_local_cache(); 

     claim_mutex(); 

     // read offset may have changed, re-read it 
     current_read_offset = InterlockedCompareExchange(read_offset, 0, 0); 

     if ((*write_offset == current_read_offset + QUEUE_LENGTH) || 
      (*write_offset + QUEUE_LENGTH == current_read_offset)) 
     { 
      // Queue is full, wait for consumer to remove a message 
      WaitForSingleObject(signal_producer_event, INFINITE); 
      continue; 
     } 

     copy_from_local_cache_to_shared_memory((*write_offset) % QUEUE_LENGTH); 

     MemoryBarrier(); 
     _ReadWriteBarrier; 

     InterlockedExchange(write_offset, increment_offset(*write_offset)); 

     release_mutex(); 
    } 
} 

如果活動製片人崩潰,如廢棄的互斥體將被檢測;你可以像對待這個互斥體一樣正確地釋放這個情況。如果崩潰的進程增加了寫入偏移量,那麼它添加的條目將照常進行處理;如果不是,它會被任何一個製片人聲稱互斥體覆蓋。在任何情況下都不需要任何特殊行動。

+0

謝謝哈利,這個清楚而明確的回答。我會測試它,並在這裏報告如何。 目前我有一個問題,但。這隻會支持單一生產者,單一消費者嗎?雖然這正是我所要求的,但如果我可以爲多生產者單一消費者場景工作,那將是非常好的。 謝謝! –

+0

該算法特定於單個生產者,單個消費者。多個生產者,多個消費者看起來會完全不同,在這種情況下,環形緩衝區可能不是最好的選擇。你可能想檢查一本算法或其他的東西;儘管如果我有時間思考這個問題,我相信我可以想出一些有用的東西,但這可能不是最好的解決方案。 –

+0

謝謝。我環顧四周,但沒有找到滿足所有要求的解決方案。我爲Multiple Producer Single Consumer案例發佈了另一個問題(https://stackoverflow.com/questions/45768849/how-can-i-implement-a-robust-interprocess-multiple-producer-single-consumer-mess)。 –