正如其他人提到的那樣,您需要有某種內存屏障來確保事物在多個處理器(和進程)之間正確同步。
我建議你改變你的方案,定義一組當前可用的條目,並在新條目變爲可用時使用互鎖增量。
http://msdn.microsoft.com/en-us/library/windows/desktop/ms683614%28v=vs.85%29.aspx
的結構,我建議是這樣的,所以你實際上可以達到你想要什麼,並做到這一點很快:
// at the very start, the number of buffers you might have total
uint32_t m_size; // if you know the max. number maybe use a const instead...
// then m_size structures, one per buffer:
uint32_t m_offset0; // offset to your data
uint32_t m_size0; // size of that buffer
uint32_t m_busy0; // whether someone is working on the buffer
uint32_t m_offset1;
uint32_t m_size1;
uint32_t m_busy1;
...
uint32_t m_offsetN;
uint32_t m_sizeN;
uint32_t m_busyN;
隨着偏移和大小,你獲得的任何直接訪問緩衝區在您的映射區域。要分配一個緩衝區,你可能想要實現類似於malloc()所做的一些事情,儘管所有必要的信息都在這個表中,所以不需要鏈接列表等。但是,如果你要釋放一些緩衝區,你需要跟蹤它的大小。如果你一直分配/釋放,你就會有分裂的樂趣。無論如何...
另一種方式是利用環形緩衝區(本質上是一個「管道」),所以你總是在最後一個緩衝區之後分配,如果沒有足夠的空間,那麼在開始時分配,關閉N根據新的緩衝區大小要求所需的緩衝區......這可能會更容易實現。但是,這意味着您可能需要知道在尋找緩衝區時從哪裏開始(例如,對於當前被認爲是「第一個」[最老]緩衝區的索引,它將成爲下一個將被重用的緩衝區。)
但是由於您沒有解釋緩衝區如何變得「老」並且可以重用(釋放以便可以重用),所以我無法真正給你一個確切的實現。但是像下面這樣的東西可能會爲你做。
在頭結構中,如果m_offset爲零,那麼緩衝區當前未分配,因此與該條目無關。如果m_busy爲零,則沒有進程正在訪問該緩衝區。我還提供了一個可以是0或1的m_free字段。只要需要更多緩衝區來保存剛收到的數據,作者就會將該參數設置爲1。我對這個問題並沒有太深入的瞭解,因爲我再也不知道你是如何釋放緩衝區的。如果你永遠不釋放緩衝區也不是必需的。
0)結構
// only if the size varies between runs, otherwise use a constant like:
// namespace { uint32_t const COUNT = 123; }
struct header_count_t
{
uint32_t m_size;
};
struct header_t
{
uint32_t m_offset;
uint32_t m_size;
uint32_t m_busy; // to use with Interlocked...() you may want to use LONG instead
};
// and from your "ptr" you'd do:
header_count_t *header_count = (header_count_t *) ptr;
header_count->m_size = ...; // your dynamic size (if dynamic it needs to be)
header_t *header = (header_t *) (header_count + 1);
// first buffer will be at: data = (char *) (header + header_count->m_size)
for(size_t n(0); n < header_count->m_size; ++n)
{
// do work (see below) on header[n]
...
}
1)來訪問數據必須首先鎖定該緩衝器中,如果沒有可用的,與下一個重試的寫入器;鎖定與InterlockedIncrement()
和解鎖完成與InterlockedDecrement()
:
InterlockedIncrement(&header[n]->m_busy);
if(header[n]->m_offset == nullptr)
{
// buffer not allocated yet, allocate now and copy data,
// but do not save the offset until "much" later
uint32_t offset = malloc_buffer();
memcpy(ptr + offset, source_data, size);
header[n]->m_size = size;
// extra memory barrier to make sure that the data copied
// in the buffer is all there before we save the offset
InterlockedIncrement(&header[n]->m_busy);
header[n]->m_offset = offset;
InterlockedDecrement(&header[n]->m_busy);
}
InterlockedDecrement(&header[n]->m_busy);
現在,如果你希望能夠釋放緩衝區,這將是不夠的。在這種情況下,需要另一個標誌來防止其他進程重新使用舊的緩衝區。再次,這將取決於您的實施...(請參閱下面的示例)。
2)訪問數據的讀取器必須先用緩衝區鎖定緩衝區,然後用InterlockedIncrement()
緩衝區完成,它需要釋放緩衝區InterlockedDecrement()
。請注意,即使m_offset爲nullptr,該鎖也適用。
InterlockedIncrement(&header[n]->m_busy);
if(header[n]->m_offset)
{
// do something with the buffer
uint32_t size(header[n]->m_size);
char const *buffer_ptr = ptr + header[n]->m_offset;
...
}
InterlockedDecrement(header[n]->m_busy);
所以這裏我只是測試是否設置m_offset。 3)如果你想釋放一個緩衝區,你還需要測試另一個標誌(見下面),如果另一個標誌爲真(或假),那麼緩衝區即將被釋放(儘快因爲所有進程都發布了它),然後該標誌可以用在前面的代碼片段中(即,m_offset爲零,或者該標誌爲1,m_busy
計數器正好爲1.)
對於作者而言,
LONG lock = InterlockedIncrement(&header[n]->m_busy);
if(header[n]->m_offset == nullptr
|| (lock == 1 && header[n]->m_free == 1))
{
// new buffer (nullptr) or reusing an old buffer
// reset the offset first
InterlockedIncrement(&header[n]->m_busy);
header[n]->m_offset = nullptr;
InterlockedDecrement(&header[n]->m_busy);
// then clear m_free
header[n]->m_free = 0;
InterlockedIncrement(&header[n]->m_busy); // WARNING: you need another Decrement against this one...
// code as before (malloc_buffer, memcpy, save size & offset...)
...
}
InterlockedDecrement(&header[n]->m_busy);
而且在讀者的測試與修改:
if(header[n]->m_offset && header[n]->m_free == 0)
作爲一個方面說明:所有Interlocked ...()函數都是完整的內存屏障(柵欄),所以你在這方面都很好。你必須使用其中的許多來確保你獲得正確的同步。
請注意,這是未經測試的代碼......但如果你想避免進程間信號量(這可能不會簡化這麼多),那就是要走的路。請注意,本身不需要20ms的sleep(),除非爲了避免每個閱讀器出現一個固定的CPU,顯然。
不可以。很難猜測睡眠20毫秒如何比互斥鎖更有效。這當然是不可能的。高賠率你應該使用管道來代替。 –
@HansPassant:它更高效,因爲讀者大多會略微滯後於作者。所以輪詢非常低,可以並行工作。如果我使用互斥鎖(不像示例),那麼讀者和作者不能並行工作...... – ronag
不知道如何將管道/消息隊列應用於此? – ronag