2014-01-30 22 views
0

當我從生產者(線程1)向消費者(線程2)傳遞數據時,我有一段代碼用於測試各種容器(例如deque和循環緩衝區)。數據由具有一對時間戳的結構表示。第一個時間戳是在生產者推入之前進行的,第二個時間戳是在消費者彈出數據時進行的。 容器受pthread自旋鎖保護。在兩個線程之間傳遞數據時的時間不一致

機器運行Redhat 5.5 2.6.18內核(舊!),它是禁用超線程的4核系統。在所有測試中都使用了gcc 4.7和-std = C++ 11標誌。

生產者獲取鎖,將數據加時間戳並將其推入隊列,在忙碌的循環中解鎖並休眠2微秒(這是我發現在該系統上精確到2微秒的唯一可靠方式)。

消費者鎖,彈出數據,時間戳它並生成一些統計數據(運行平均延遲和標準偏差)。統計信息每5秒打印一次(M是平均值,M2是標準開發)並重置。我使用gettimeofday()來獲取時間戳,這意味着平均延遲數可以被認爲是超過1微秒的延遲的百分比。

大部分時間的輸出是這樣的:

CNT=2500000 M=0.00935 M2=0.910238 
    CNT=2500000 M=0.0204112 M2=1.57601 
    CNT=2500000 M=0.0045016 M2=0.372065 

但有時(可能是1個試製出20)所示:

CNT=2500000 M=0.523413 M2=4.83898 
    CNT=2500000 M=0.558525 M2=4.98872 
    CNT=2500000 M=0.581157 M2=5.05889 

(注意平均數比差遠了在第一種情況下,它不會在程序運行時恢復)。

我會很感激爲什麼會發生這種情況。謝謝。

#include <iostream> 
#include <string.h> 
#include <stdexcept> 
#include <sys/time.h> 
#include <deque> 
#include <thread> 
#include <cstdint> 
#include <cmath> 
#include <unistd.h> 
#include <xmmintrin.h> // _mm_pause() 

int64_t timestamp() { 
    struct timeval tv; 
    gettimeofday(&tv, 0); 
    return 1000000L * tv.tv_sec + tv.tv_usec; 
} 

//running mean and a second moment 
struct StatsM2 { 
    StatsM2() {} 
    double m = 0; 
    double m2 = 0; 
    long count = 0; 
    inline void update(long x, long c) { 
     count = c; 
     double delta = x - m; 
     m += delta/count; 
     m2 += delta * (x - m); 
    } 
    inline void reset() { 
     m = m2 = 0; 
     count = 0; 
    } 
    inline double getM2() { // running second moment 
     return (count > 1) ? m2/(count - 1) : 0.; 
    } 
    inline double getDeviation() { 
     return std::sqrt(getM2()); 
    } 
    inline double getM() { // running mean 
     return m; 
    } 
}; 

// pause for usec microseconds using busy loop 
int64_t busyloop_microsec_sleep(unsigned long usec) { 
    int64_t t, tend; 
    tend = t = timestamp(); 
    tend += usec; 
    while (t < tend) { 
     t = timestamp(); 
    } 
    return t; 
} 

struct Data { 
    Data() : time_produced(timestamp()) {} 
    int64_t time_produced; 
    int64_t time_consumed; 
}; 

int64_t sleep_interval = 2; 
StatsM2 statsm2; 
std::deque<Data> queue; 
bool producer_running = true; 
bool consumer_running = true; 
pthread_spinlock_t spin; 

void producer() { 
    producer_running = true; 
    while(producer_running) { 
     pthread_spin_lock(&spin); 
     queue.push_back(Data()); 
     pthread_spin_unlock(&spin); 
     busyloop_microsec_sleep(sleep_interval); 
    } 
} 

void consumer() { 
    int64_t count = 0; 
    int64_t print_at = 1000000/sleep_interval * 5; 
    Data data; 
    consumer_running = true; 
    while (consumer_running) { 
     pthread_spin_lock(&spin); 
     if (queue.empty()) { 
      pthread_spin_unlock(&spin); 
      // _mm_pause(); 
      continue; 
     } 
     data = queue.front(); 
     queue.pop_front(); 
     pthread_spin_unlock(&spin); 
     ++count; 
     data.time_consumed = timestamp(); 
     statsm2.update(data.time_consumed - data.time_produced, count); 
     if (count >= print_at) { 
      std::cerr << "CNT=" << count << " M=" << statsm2.getM() << " M2=" << statsm2.getDeviation() << "\n"; 
      statsm2.reset(); 
      count = 0; 
     } 
    } 
} 

int main(void) { 
    if (pthread_spin_init(&spin, PTHREAD_PROCESS_PRIVATE) < 0) 
     exit(2); 
    std::thread consumer_thread(consumer); 
    std::thread producer_thread(producer); 
    sleep(40); 
    consumer_running = false; 
    producer_running = false; 
    consumer_thread.join(); 
    producer_thread.join(); 
    return 0; 
} 
+0

漂亮的房間加熱器。 –

+0

嘗試記錄構成統計數據的完整數據集以跟蹤此情況。可以想象得到這些是因爲數據中存在垃圾。雖然出列線程不是線程安全的,但您不應該在它的表面上需要「易失性」,因爲鎖應該發出障礙。儘管如此,也許編譯器仍在移動變量 - 很難說沒有看到反彙編。將出列標記爲「volatile」以查看是否有幫助。 – mockinterface

+0

我記錄的數據是一致的,不幸的是volatile不起作用。謝謝你的建議。 – Cattus

回答

1

編輯:
我認爲,以下5是可以解釋1/2秒的等待時間的唯一的事。當在同一個核心上時,每個核心都會運行很長時間,然後切換到另一個核心。
列表中其餘的東西太小,導致1/2秒的延遲。
您可以使用pthread_setaffinity_np將您的線程固定到特定內核。您可以嘗試不同的組合並查看性能如何變化。

編輯#2:
你應該照顧更多的東西:(誰說,測試很簡單......)
1.確保消費者已經運行當製片人開始產生。由於生產者並不是真正在緊張的環境中生產,因此對您而言並不重要。
2. 這是非常重要的:你每次除以數,這是不正確的做你的統計。這意味着每個統計窗口中的第一次測量比最後一次測量重要得多。要衡量中位數,您必須收集所有的值。在沒有收集所有數字的情況下,測量平均值和最小值/最大值應能夠爲您提供足夠好的延遲圖。


這並不奇怪,真的。
1.時間在Data()中,但容器花費時間調用malloc。
2.您正在運行64位或32位?在32位gettimeofday是一個系統調用,而在64位它是一個沒有進入內核的VDSO ...你可能需要自己計算gettimeofday並記錄差異。或使用rdtsc註冊您自己的。
最好的辦法是使用循環代替微型計算機,因爲微型計算機對於這種情況實在太大了......只有微調時微調會讓你在處理如此小規模的事情時非常傾斜
3.您是否得到保證在生產者和消費者之間不被搶佔?我想不是。但是這不應該在專門用於測試的盒子上經常發生......
4.它是在一個插座上還是在2個上是4個核心?如果它是一個2套接字盒,您希望在同一個套接字上擁有2個線程,或者您支付(至少)兩次數據傳輸。
5.確保線程不在同一核心上運行。
6.如果您傳輸的數據和附加數據(容器節點)與其他Data +節點共享緩存行(可能有多種類型),則生產者在寫入消耗的時間戳時會被延遲。這被稱爲虛假分享。您可以通過填充/對齊至64字節並使用侵入式容器來消除此問題。

0

gettimeofday不是分析計算開銷的好方法。這是掛鐘,你的電腦是多處理器。即使你認爲你沒有運行其他任何東西,操作系統調度程序總是有一些其他的活動來保持系統運行。要分析您的流程開銷,您必須至少提高您正在分析的流程的優先級。還可以使用高分辨率定時器或cpu刻度來執行時間測量。

相關問題