2014-06-14 30 views
1

我正在使用內存映射文件進行跨進程數據共享。跨進程內存屏障

我有兩個進程,一個寫入數據塊和一個或多個讀取這些塊的其他進程。爲了讓讀者知道一個塊是否準備就緒,我正在寫兩個「標籤」值,一個在開始處,一個在每個塊的末尾,表示已準備就緒。

它看起來是這樣的:

注意:在這個例子中,我不包括的事實,讀者進程可以尋求到以前的塊。

static const int32_t START_TAG = 0xFAFAFAFA; 
static const int32_t END_TAG = 0x06060606; 

void writer_process(int32_t* memory_mapped_file_ptr) 
{ 
    auto ptr = memory_mapped_file_ptr; 
    while (true) 
    { 
     std::vector<int32_t> chunk = generate_chunk(); 
     std::copy(ptr + 2, chunk.begin(), chunk.end()); 

     // We are done writing. Write the tags. 

     *ptr = START_TAG; 
     ptr += 1; 
     *ptr = chunk.size(); 
     ptr += 1 + chunk.size(); 
     *ptr = END_TAG; 
     ptr += 1; 
    } 
} 

void reader_process(int32_t* memory_mapped_file_ptr) 
{ 
    auto ptr = memory_mapped_file_ptr; 
    while (true) 
    { 
     auto ptr2 = ptr; 

     std::this_thread::sleep_for(std::chrono::milliseconds(20)); 

     if (*ptr2 != START_TAG) 
      continue; 

     ptr2 += 1; 

     auto len = *ptr2; 
     ptr2 += 1; 

     if (*(ptr2 + len) != END_TAG) 
      continue; 

     std::vector<int32_t> chunk(ptr2, ptr2 + len); 

     process_chunk(chunk); 
    } 
} 

此類作品至今。但它在我看來是一個非常糟糕的想法,並可能由於緩存行爲而導致各種奇怪的錯誤。

有沒有更好的方法來實現這一目標?

我看:

  • 消息隊列:效率低下,只有一個讀者的作品。我也無法尋求前面的塊。

  • 互斥鎖:不確定如何鎖定當前塊而不是整個內存。我無法爲每個可能的塊創建互斥鎖(特別是因爲它們具有動態大小)。我曾考慮將內存分成一個互斥塊,但由於寫入和讀取之間的延遲,這對我來說不起作用。

+0

不可以。很難猜測睡眠20毫秒如何比互斥鎖更有效。這當然是不可能的。高賠率你應該使用管道來代替。 –

+0

@HansPassant:它更高效,因爲讀者大多會略微滯後於作者。所以輪詢非常低,可以並行工作。如果我使用互斥鎖(不像示例),那麼讀者和作者不能並行工作...... – ronag

+0

不知道如何將管道/消息隊列應用於此? – ronag

回答

1

正如其他人提到的那樣,您需要有某種內存屏障來確保事物在多個處理器(和進程)之間正確同步。

我建議你改變你的方案,定義一組當前可用的條目,並在新條目變爲可用時使用互鎖增量。

http://msdn.microsoft.com/en-us/library/windows/desktop/ms683614%28v=vs.85%29.aspx

的結構,我建議是這樣的,所以你實際上可以達到你想要什麼,並做到這一點很快:

// at the very start, the number of buffers you might have total 
uint32_t m_size; // if you know the max. number maybe use a const instead... 

// then m_size structures, one per buffer: 
uint32_t m_offset0; // offset to your data 
uint32_t m_size0; // size of that buffer 
uint32_t m_busy0; // whether someone is working on the buffer 
uint32_t m_offset1; 
uint32_t m_size1; 
uint32_t m_busy1; 
... 
uint32_t m_offsetN; 
uint32_t m_sizeN; 
uint32_t m_busyN; 

隨着偏移和大小,你獲得的任何直接訪問緩衝區在您的映射區域。要分配一個緩衝區,你可能想要實現類似於malloc()所做的一些事情,儘管所有必要的信息都在這個表中,所以不需要鏈接列表等。但是,如果你要釋放一些緩衝區,你需要跟蹤它的大小。如果你一直分配/釋放,你就會有分裂的樂趣。無論如何...

另一種方式是利用環形緩衝區(本質上是一個「管道」),所以你總是在最後一個緩衝區之後分配,如果沒有足夠的空間,那麼在開始時分配,關閉N根據新的緩衝區大小要求所需的緩衝區......這可能會更容易實現。但是,這意味着您可能需要知道在尋找緩衝區時從哪裏開始(例如,對於當前被認爲是「第一個」[最老]緩衝區的索引,它將成爲下一個將被重用的緩衝區。)

但是由於您沒有解釋緩衝區如何變得「老」並且可以重用(釋放以便可以重用),所以我無法真正給你一個確切的實現。但是像下面這樣的東西可能會爲你做。

在頭結構中,如果m_offset爲零,那麼緩衝區當前未分配,因此與該條目無關。如果m_busy爲零,則沒有進程正在訪問該緩衝區。我還提供了一個可以是0或1的m_free字段。只要需要更多緩衝區來保存剛收到的數據,作者就會將該參數設置爲1。我對這個問題並沒有太深入的瞭解,因爲我再也不知道你是如何釋放緩衝區的。如果你永遠不釋放緩衝區也不是必需的。

0)結構

// only if the size varies between runs, otherwise use a constant like: 
// namespace { uint32_t const COUNT = 123; } 
struct header_count_t 
{ 
    uint32_t m_size; 
}; 

struct header_t 
{ 
    uint32_t m_offset; 
    uint32_t m_size; 
    uint32_t m_busy; // to use with Interlocked...() you may want to use LONG instead 
}; 

// and from your "ptr" you'd do: 
header_count_t *header_count = (header_count_t *) ptr; 
header_count->m_size = ...; // your dynamic size (if dynamic it needs to be) 
header_t *header = (header_t *) (header_count + 1); 
// first buffer will be at: data = (char *) (header + header_count->m_size) 
for(size_t n(0); n < header_count->m_size; ++n) 
{ 
    // do work (see below) on header[n] 
    ... 
} 

1)來訪問數據必須首先鎖定該緩衝器中,如果沒有可用的,與下一個重試的寫入器;鎖定與InterlockedIncrement()和解鎖完成與InterlockedDecrement()

InterlockedIncrement(&header[n]->m_busy); 
if(header[n]->m_offset == nullptr) 
{ 
    // buffer not allocated yet, allocate now and copy data, 
    // but do not save the offset until "much" later 
    uint32_t offset = malloc_buffer(); 
    memcpy(ptr + offset, source_data, size); 
    header[n]->m_size = size; 

    // extra memory barrier to make sure that the data copied 
    // in the buffer is all there before we save the offset 
    InterlockedIncrement(&header[n]->m_busy); 
    header[n]->m_offset = offset; 
    InterlockedDecrement(&header[n]->m_busy); 
} 
InterlockedDecrement(&header[n]->m_busy); 

現在,如果你希望能夠釋放緩衝區,這將是不夠的。在這種情況下,需要另一個標誌來防止其他進程重新使用舊的緩衝區。再次,這將取決於您的實施...(請參閱下面的示例)。

2)訪問數據的讀取器必須先用緩衝區鎖定緩衝區,然後用InterlockedIncrement()緩衝區完成,它需要釋放緩衝區InterlockedDecrement() 。請注意,即使m_offset爲nullptr,該鎖也適用。

InterlockedIncrement(&header[n]->m_busy); 
if(header[n]->m_offset) 
{ 
    // do something with the buffer 
    uint32_t size(header[n]->m_size); 
    char const *buffer_ptr = ptr + header[n]->m_offset; 
    ... 
} 
InterlockedDecrement(header[n]->m_busy); 

所以這裏我只是測試是否設置m_offset。 3)如果你想釋放一個緩衝區,你還需要測試另一個標誌(見下面),如果另一個標誌爲真(或假),那麼緩衝區即將被釋放(儘快因爲所有進程都發布了它),然後該標誌可以用在前面的代碼片段中(即,m_offset爲零,或者該標誌爲1,m_busy計數器正好爲1.)

對於作者而言,

LONG lock = InterlockedIncrement(&header[n]->m_busy); 
if(header[n]->m_offset == nullptr 
|| (lock == 1 && header[n]->m_free == 1)) 
{ 
    // new buffer (nullptr) or reusing an old buffer 

    // reset the offset first 
    InterlockedIncrement(&header[n]->m_busy); 
    header[n]->m_offset = nullptr; 
    InterlockedDecrement(&header[n]->m_busy); 
    // then clear m_free 
    header[n]->m_free = 0; 
    InterlockedIncrement(&header[n]->m_busy); // WARNING: you need another Decrement against this one... 

    // code as before (malloc_buffer, memcpy, save size & offset...) 
    ... 
} 
InterlockedDecrement(&header[n]->m_busy); 

而且在讀者的測試與修改:

if(header[n]->m_offset && header[n]->m_free == 0) 

作爲一個方面說明:所有Interlocked ...()函數都是完整的內存屏障(柵欄),所以你在這方面都很好。你必須使用其中的許多來確保你獲得正確的同步。

請注意,這是未經測試的代碼......但如果你想避免進程間信號量(這可能不會簡化這麼多),那就是要走的路。請注意,本身不需要20ms的sleep(),除非爲了避免每個閱讀器出現一個固定的CPU,顯然。

+0

非常好。謝謝! – ronag