更新:我已經更新了基於下面的正確答案代碼。這有效,但會產生一個新問題(我會發佈一個新問題)。在java中編寫線程安全的阻塞有界緩衝區結構時,如何處理索引溢出?
創建阻塞界使用旗語與多個生產者和消費者Buffer類。
目標是使用原子整數作爲指針,所以我不必在內部進行同步。溢出處理現已更正爲使用CAS。
但除非我周圍使用的AtomicInteger指針同步,這並不工作(見下文評論)。 不知道爲什麼?滾動所有以下方式,看看我的意思是「缺項」 ......
public class BoundedBuffer<T> {
private static final int BUFFER_SIZE = Short.MAX_VALUE+1;
private AtomicReferenceArray<T> m_buffer = null;
private Semaphore m_full = new Semaphore(BUFFER_SIZE);
private Semaphore m_empty = new Semaphore(0);
private AtomicInteger m_writePointer = new AtomicInteger();
private AtomicInteger m_readPointer = new AtomicInteger();
public BoundedBuffer() {
m_buffer = new AtomicReferenceArray<T>(BUFFER_SIZE);
}
public static int safeGetAndIncrement(AtomicInteger i) {
int oldValue = 0, newValue = 0;
do {
oldValue = i.get();
newValue = (oldValue == Short.MAX_VALUE) ? 0 : (oldValue + 1);
} while (!i.compareAndSet(oldValue, newValue));
return oldValue;
}
public void add(T data) throws InterruptedException {
m_full.acquire();
synchronized (this) { // << Commenting this doesn't work
// CAS-based overflow handling
m_buffer.set(safeGetAndIncrement(m_writePointer),data);
}
m_empty.release();
}
public T get() throws InterruptedException {
T data = null;
m_empty.acquire();
synchronized (this) { // << Commenting this doesn't work
// CAS-based overflow handling
data = m_buffer.get(safeGetAndIncrement(m_readPointer));
}
m_full.release();
return data;
}
}
測試程序有...
8生產者線程,每年投入約10000項到隊列。 每個條目是格式的字符串:A「:」 B 其中 A是數字0..7用於8個線程。 B是距離0..9999
4消費者線程,消耗的一切數字增加計數,直到空被擊中。
一旦生產線已經完成了添加什麼東西都往緩衝,4空的被添加到隊列(停止消費者)。
輸出線程...
,P:數據,1:9654 @ 1 ,P:數據,5:1097 @ 347 C:數據,1:9654 @ 1 ,P:數據4:5538 @ 1 C:數據,4:5538 @ 1個 C:數據,空@ 14466
驗證 當驗證,如果印製的所有條目都是消費者,少數出現缺失(之前arrayindexoutofbounds被擊中(可能只是巧合)。
驗證中... 失蹤4:5537 缺少5:1096 驗證
「我看過缺失的條目」對於任何人都沒有足夠的信息來弄清楚你的意思。 –
Missing =消費者沒有看到生產者放入的所有條目。示例輸出....開始:0 開始:2 開始:1 開始:3 ,P:數據,1:9654 @ 1 ,P:數據,5:1097 @ 347 C:數據,1:9654 @ 1 ,P:數據, 4:5538 @ 1個 C:數據,4:5538 @ 1 完成:0 完成:3 完成:1 C:數據,空@ 14466 表面處理:2 驗證... 缺少4:5537 Missing 5:1096 已驗證 – TechnoJab