當我從生產者(線程1)向消費者(線程2)傳遞數據時,我有一段代碼用於測試各種容器(例如deque和循環緩衝區)。數據由具有一對時間戳的結構表示。第一個時間戳是在生產者推入之前進行的,第二個時間戳是在消費者彈出數據時進行的。 容器受pthread自旋鎖保護。在兩個線程之間傳遞數據時的時間不一致
機器運行Redhat 5.5 2.6.18內核(舊!),它是禁用超線程的4核系統。在所有測試中都使用了gcc 4.7和-std = C++ 11標誌。
生產者獲取鎖,將數據加時間戳並將其推入隊列,在忙碌的循環中解鎖並休眠2微秒(這是我發現在該系統上精確到2微秒的唯一可靠方式)。
消費者鎖,彈出數據,時間戳它並生成一些統計數據(運行平均延遲和標準偏差)。統計信息每5秒打印一次(M是平均值,M2是標準開發)並重置。我使用gettimeofday()來獲取時間戳,這意味着平均延遲數可以被認爲是超過1微秒的延遲的百分比。
大部分時間的輸出是這樣的:
CNT=2500000 M=0.00935 M2=0.910238
CNT=2500000 M=0.0204112 M2=1.57601
CNT=2500000 M=0.0045016 M2=0.372065
但有時(可能是1個試製出20)所示:
CNT=2500000 M=0.523413 M2=4.83898
CNT=2500000 M=0.558525 M2=4.98872
CNT=2500000 M=0.581157 M2=5.05889
(注意平均數比差遠了在第一種情況下,它不會在程序運行時恢復)。
我會很感激爲什麼會發生這種情況。謝謝。
#include <iostream>
#include <string.h>
#include <stdexcept>
#include <sys/time.h>
#include <deque>
#include <thread>
#include <cstdint>
#include <cmath>
#include <unistd.h>
#include <xmmintrin.h> // _mm_pause()
int64_t timestamp() {
struct timeval tv;
gettimeofday(&tv, 0);
return 1000000L * tv.tv_sec + tv.tv_usec;
}
//running mean and a second moment
struct StatsM2 {
StatsM2() {}
double m = 0;
double m2 = 0;
long count = 0;
inline void update(long x, long c) {
count = c;
double delta = x - m;
m += delta/count;
m2 += delta * (x - m);
}
inline void reset() {
m = m2 = 0;
count = 0;
}
inline double getM2() { // running second moment
return (count > 1) ? m2/(count - 1) : 0.;
}
inline double getDeviation() {
return std::sqrt(getM2());
}
inline double getM() { // running mean
return m;
}
};
// pause for usec microseconds using busy loop
int64_t busyloop_microsec_sleep(unsigned long usec) {
int64_t t, tend;
tend = t = timestamp();
tend += usec;
while (t < tend) {
t = timestamp();
}
return t;
}
struct Data {
Data() : time_produced(timestamp()) {}
int64_t time_produced;
int64_t time_consumed;
};
int64_t sleep_interval = 2;
StatsM2 statsm2;
std::deque<Data> queue;
bool producer_running = true;
bool consumer_running = true;
pthread_spinlock_t spin;
void producer() {
producer_running = true;
while(producer_running) {
pthread_spin_lock(&spin);
queue.push_back(Data());
pthread_spin_unlock(&spin);
busyloop_microsec_sleep(sleep_interval);
}
}
void consumer() {
int64_t count = 0;
int64_t print_at = 1000000/sleep_interval * 5;
Data data;
consumer_running = true;
while (consumer_running) {
pthread_spin_lock(&spin);
if (queue.empty()) {
pthread_spin_unlock(&spin);
// _mm_pause();
continue;
}
data = queue.front();
queue.pop_front();
pthread_spin_unlock(&spin);
++count;
data.time_consumed = timestamp();
statsm2.update(data.time_consumed - data.time_produced, count);
if (count >= print_at) {
std::cerr << "CNT=" << count << " M=" << statsm2.getM() << " M2=" << statsm2.getDeviation() << "\n";
statsm2.reset();
count = 0;
}
}
}
int main(void) {
if (pthread_spin_init(&spin, PTHREAD_PROCESS_PRIVATE) < 0)
exit(2);
std::thread consumer_thread(consumer);
std::thread producer_thread(producer);
sleep(40);
consumer_running = false;
producer_running = false;
consumer_thread.join();
producer_thread.join();
return 0;
}
漂亮的房間加熱器。 –
嘗試記錄構成統計數據的完整數據集以跟蹤此情況。可以想象得到這些是因爲數據中存在垃圾。雖然出列線程不是線程安全的,但您不應該在它的表面上需要「易失性」,因爲鎖應該發出障礙。儘管如此,也許編譯器仍在移動變量 - 很難說沒有看到反彙編。將出列標記爲「volatile」以查看是否有幫助。 – mockinterface
我記錄的數據是一致的,不幸的是volatile不起作用。謝謝你的建議。 – Cattus