2011-10-28 31 views
2

在讀完C++ 0x的原子並結合非鎖定隊列之後,我決定去玩它們。在C++中使用原子的樂觀鎖定策略和排序

這個想法是寫一個生產者,樂觀鎖定多個消費者隊列。消息不需要消耗。只要消費者讀取它即可讀取最後一個版本或知道讀取的結果不好,跳過就沒有問題。

在下面的代碼中,我想到的策略失敗了。由於數據亂序寫入,數據被破壞。任何指示爲什麼這是,以及如何解決它將不勝感激。在Linux

彙編:克++ -std =的C++ 0x -o代碼code.cpp -lpthread

謝謝, 丹尼斯

// 
// This features 2 threads in which the first writes to a structure 
// and the second tries to read from that with an optimistic 
// locking strategy. The data is equal to the versioning so we can 
// see if the data is corrupt or not. 
// 
// @since: 2011-10-28 
// @author: Dennis Fleurbaaij <[email protected]> 
// 

#include <pthread.h> 
#include <stdio.h> 
#include <stdlib.h> 
#include <unistd.h> 
#include <stdatomic.h> 
#include <sched.h> 
#include <assert.h> 
#include <iostream> 
#include <xmmintrin.h> 

struct structure_t 
{ 
    std::atomic<unsigned int> id; 
    unsigned int data_a; 
    unsigned int data_b; 

    char _pad[ 64 - 12 ]; 
}; 

#define NUM_STRUCTURES 2 
struct structure_t structures[NUM_STRUCTURES]; 

std::atomic<size_t> current_version_index; 

volatile bool start = false; 
volatile bool run = true; 
size_t const iter_count = 10000000; 

/** 
* Write thread 
*/ 
void* writer(void*) 
{ 
    while(!start) 
     sched_yield(); 

    size_t i; 
    for(i=0 ; i<iter_count ; ++i) 
    { 
     size_t index = current_version_index.load(std::memory_order_relaxed); 
     size_t next_index = (current_version_index + 1) & NUM_STRUCTURES-1; 

     structures[next_index].data_a = i; 
     structures[next_index].data_b = i; 

     structures[next_index].id.store(i, std::memory_order_release); 

     current_version_index.store(next_index); 

     //std::cout << "Queued - id: " << i << ", index: " << next_index << std::endl; 
     //sleep(1); 
    } 

    run=false; 
} 

/** 
* Read thread 
*/ 
void* reader(void*) 
{ 
    while(!start) 
     sched_yield(); 

    unsigned int prev_id=0; 

    size_t i; 
    while(run) 
    { 
     size_t index = current_version_index.load(std::memory_order_relaxed); 
     unsigned int id = structures[index].id.load(std::memory_order_acquire); 

     if(id > prev_id) 
     { 
      unsigned int data_a = structures[index].data_a; 
      unsigned int data_b = structures[index].data_b; 

      // Re-read the data and check optimistic lock. This should be read after 
      // the lines above and should not be optimized away. 
      // 
      // This is what fails after a while: 
      // Error in data. Index: 0, id: 24097, id2: 24097, data_a: 24099, data_b: 24099 
      unsigned int id2 = structures[index].id.load(std::memory_order_acquire); 
      if(id2 > id) 
      { 
       continue; 
      } 

      if(id != id2 || 
       id != data_a || 
       id != data_b) 
      { 
       std::cerr << "Error in data. Index: " << index << ", id: " << id 
          << ", id2: " << id2 << ", data_a: " << data_a << ", data_b: " << data_b << std::endl; 

       exit(EXIT_FAILURE); 
      } 

      //std::cout << "Read. Index: " << index << ", id: " << id 
      //    << ", data_a: " << data_a << ", data_b: " << data_b << std::endl; 

      prev_id = id; 
     } 

     _mm_pause(); 
    } 
} 

/** 
* Main 
*/ 
int main (int argc, char *argv[]) 
{ 
    assert(sizeof(structure_t) == 64); 

    pthread_t write_thread, read_thread; 
    pthread_create(&write_thread, NULL, writer, (void*)NULL); 
    pthread_create(&read_thread, NULL, reader, (void*)NULL); 

    sleep(1); 

    start = 1; 

    void *status; 
    pthread_join(read_thread, &status); 
    pthread_join(write_thread, &status); 
} 
+0

我不確定這不屬於codereview而不是這裏。無論如何,你應該用純英語陳述你打算實施的策略,以便將其與實施的策略進行比較。 –

+0

我希望我在前面的文字中說清楚了。除了我寫的內容外,你能告訴我你在設計方面需要什麼嗎? 「這個想法是寫一個單一的生產者,樂觀鎖定多個消費者隊列。」 也許有補充。消息不需要消耗。只要消費者讀取它即可讀取最後一個版本或知道讀取的結果不好,跳過就沒有問題。 – DennisFleurbaaij

+0

我並不熟悉C++ 11原子的速度;可能相關的問題:'truct'的填充暗示你想要結構對齊?你如何確保它對齊?這對正確性有重要意義嗎? – sehe

回答

0

也許,這是一個錯誤:

structures[next_index].data_a = i; 
    structures[next_index].data_b = i; 

// **** The writer may be interrupted (preempted) here for a long time *** 
// at the same time the reader reads new data but old id (number of reads doesn't matter) 

    structures[next_index].id.store(i, std::memory_order_release); // too late! 

(current_version_index可能由讀者從前面的步驟採取,所以競爭條件實際上是可能的)

+0

你是完全正確的!我想到了這一點,但認爲我得到的結果得到的是我看到的反面(更高的id,但這顯然是錯誤的)。 再次感謝! – DennisFleurbaaij