2011-09-10 30 views
2

更新:我已經更新了基於下面的正確答案代碼。這有效,但會產生一個新問題(我會發佈一個新問題)。在java中編寫線程安全的阻塞有界緩衝區結構時,如何處理索引溢出?

創建阻塞界使用旗語與多個生產者和消費者Buffer類。

目標是使用原子整數作爲指針,所以我不必在內部進行同步。溢出處理現已更正爲使用CAS。

但除非我周圍使用的AtomicInteger指針同步,這並不工作(見下文評論)。 不知道爲什麼?滾動所有以下方式,看看我的意思是「缺項」 ......

public class BoundedBuffer<T> { 
private static final int BUFFER_SIZE = Short.MAX_VALUE+1; 
private AtomicReferenceArray<T> m_buffer = null; 
private Semaphore m_full = new Semaphore(BUFFER_SIZE); 
private Semaphore m_empty = new Semaphore(0); 
private AtomicInteger m_writePointer = new AtomicInteger(); 
private AtomicInteger m_readPointer = new AtomicInteger(); 

public BoundedBuffer() { 
    m_buffer = new AtomicReferenceArray<T>(BUFFER_SIZE); 
} 

public static int safeGetAndIncrement(AtomicInteger i) { 
    int oldValue = 0, newValue = 0; 
    do { 
     oldValue = i.get(); 
     newValue = (oldValue == Short.MAX_VALUE) ? 0 : (oldValue + 1); 
    } while (!i.compareAndSet(oldValue, newValue)); 
    return oldValue; 
} 

public void add(T data) throws InterruptedException { 
    m_full.acquire(); 
    synchronized (this) { // << Commenting this doesn't work 
     // CAS-based overflow handling 
     m_buffer.set(safeGetAndIncrement(m_writePointer),data); 
    } 
    m_empty.release(); 
} 

public T get() throws InterruptedException { 
    T data = null; 
    m_empty.acquire(); 
    synchronized (this) { // << Commenting this doesn't work 
     // CAS-based overflow handling 
     data = m_buffer.get(safeGetAndIncrement(m_readPointer)); 
    } 
    m_full.release(); 
    return data; 
} 
} 

測試程序有...

8生產者線程,每年投入約10000項到隊列。 每個條目是格式的字符串:A「:」 B 其中 A是數字0..7用於8個線程。 B是距離0..9999

4消費者線程,消耗的一切數字增加計數,直到空被擊中。

一旦生產線已經完成了添加什麼東西都往緩衝,4空的被添加到隊列(停止消費者)。

輸出線程...

,P:數據,1:9654 @ 1 ,P:數據,5:1097 @ 347 C:數據,1:9654 @ 1 ,P:數據4:5538 @ 1 C:數據,4:5538 @ 1個 C:數據,空@ 14466

驗證 當驗證,如果印製的所有條目都是消費者,少數出現缺失(之前arrayindexoutofbounds被擊中(可能只是巧合)。

驗證中... 失蹤4:5537 缺少5:1096 驗證

+0

「我看過缺失的條目」對於任何人都沒有足夠的信息來弄清楚你的意思。 –

+0

Missing =消費者沒有看到生產者放入的所有條目。示例輸出....開始:0 開始:2 開始:1 開始:3 ,P:數據,1:9654 @ 1 ,P:數據,5:1097 @ 347 C:數據,1:9654 @ 1 ,P:數據, 4:5538 @ 1個 C:數據,4:5538 @ 1 完成:0 完成:3 完成:1 C:數據,空@ 14466 表面處理:2 驗證... 缺少4:5537 Missing 5:1096 已驗證 – TechnoJab

回答

2

增加計數器時,您需要處理溢出。例如,你可以使用下面的方法,而不是getAndIncrement()

public static int safeGetAndIncrement(AtomicInteger i) { 
    int oldValue = 0; 
    do { 
     oldValue = i.get(); 
     int newValue = (oldValue == MAX_VALUE) ? 0 : (oldValue + 1); 
    } while (!i.compareAndSet(oldValue, newValue)); 
    return oldValue; 
} 

它採用了典型的compare-and-swap方法,自getAndIncrement()被以同樣的方式在內部實現不應該影響性能。

此外,如果MAX_VALUEBUFFER_SIZE您不需要% BUFFER_SIZE操作。

+0

兩個線程不能同時輸入您的safeGetAndIncrement方法,然後獲取相同的oldValue並將其增加以獲得相同的newValue?那不行,現在會嗎? – TechnoJab

+0

@TechnoJab:這就是比較和交換方法的工作方式:兩個線程可以進入塊,但只有一個可以成功執行'compareAndSet()'(查看它的javadoc),另一個將會重試。 – axtavt

+0

謝謝 - 好吧,那可行,我想我理解CAS溢出處理方法。但我現在有另一個問題了(我發佈了一個全新的問題)。 – TechnoJab