2011-09-10 29 views
1

屬性我想建立...這個BoundedBuffer類的問題在哪裏?有界緩衝區類的

  • 多生產,多消費。
  • 阻止生產者和阻止消費者。
  • 使用AtomicInteger作爲讀/寫指針。
  • 使用AtomicReferenceArray(取一般類型)來保存緩衝區。
  • 緩衝區是Short.MAX_VALUE大小,它使用CAS來處理溢出。

現在的問題...

問題:我似乎無法註釋掉同步(這)塊在下面的代碼。我認爲使用AtomicInteger作爲指針的重點是避免這樣做。

註釋掉缺少消費者,生產商已經把一些項目同步(這)塊的產量。如果我有同步(這)塊,一切的偉大和生產消耗的每一件事情。

我錯過了什麼?

public class BoundedBuffer<T> { 
    private static final int BUFFER_SIZE = Short.MAX_VALUE+1; 
    private AtomicReferenceArray<T> m_buffer = null; 
    private Semaphore m_full = new Semaphore(BUFFER_SIZE); 
    private Semaphore m_empty = new Semaphore(0); 
    private AtomicInteger m_writePointer = new AtomicInteger(); 
    private AtomicInteger m_readPointer = new AtomicInteger(); 

    public BoundedBuffer() { 
     m_buffer = new AtomicReferenceArray<T>(BUFFER_SIZE); 
    } 

    public static int safeGetAndIncrement(AtomicInteger i) { 
     int oldValue = 0, newValue = 0; 
     do { 
      oldValue = i.get(); 
      newValue = (oldValue == Short.MAX_VALUE) ? 0 : (oldValue + 1); 
     } while (!i.compareAndSet(oldValue, newValue)); 
     return oldValue; 
    } 

    public void add(T data) throws InterruptedException { 
     m_full.acquire(); 
     synchronized (this) { // << Commenting this doesn't work 
      // CAS-based overflow handling 
      m_buffer.set(safeGetAndIncrement(m_writePointer),data); 
     } 
     m_empty.release(); 
    } 

    public T get() throws InterruptedException { 
     T data = null; 
     m_empty.acquire(); 
     synchronized (this) { // << Commenting this doesn't work 
      // CAS-based overflow handling 
      data = m_buffer.get(safeGetAndIncrement(m_readPointer)); 
     } 
     m_full.release(); 
     return data; 
    } 
} 
+0

可能的重複[在java中編寫線程安全的阻塞有界緩衝區結構時如何處理索引溢出?](http://stackoverflow.com/questions/7374032/how-do-you-deal-with -index-overflow-when-writing-a-thread-safe-blocking-bounded-b) –

+0

在'safeGetAndIncrement'中編寫了'...(oldValue == Short.MAX_VALUE)...'。這應該是'...(oldValue == BUFFER_SIZE - 1)...'。這是一個小問題,但除此之外,如果您決定將BUFFER_SIZE更改爲其他值,則需要更改此行,否則代碼將會中斷。 –

+0

@Adrian我已經修復了,因爲我發佈了代碼。我也意識到這一點。感謝您指出了這一點。 – TechnoJab

回答

1

有可能是一個問題,即當同步塊被去除的get()從所述陣列不是原子與增量。我推測的中斷場景要求生產者超越消費者,那麼如果信號量釋放是由無序讀取觸發的,那麼您可以讓生產者覆蓋尚未讀取的數組條目。

考慮這種情況,其中該緩衝器是滿的(作家指數爲N,讀者指數處於N + 1)和2個線程正在努力從緩衝器讀取。 (假設N是不靠近環繞點爲簡單起見。)

線程1接收來自讀取其項的索引N + 1。

線程2接收來自讀取其項的索引N + 2。

由於調度的僥倖,線程2首先從緩衝區數組中獲取,並在線程1從數組中獲取它的項之前釋放信號量m_full

線程3(生產者)喚醒並在緩衝器中寫入一個項到下一個可用時隙N + 1,也之前線索1已經從緩衝器中讀取。

線程1,則獲得了該項目的索引N + 1,但已經錯過了它想要的選項。

+0

聽起來似乎合理,我現在也看到了。我想知道是否有修復?我試圖將最初發出的完整信號量減少到緩衝區大小的一半 - 這樣緩衝區中總是有大約1/2的空間,並且讀/寫指針總是相隔很遠。這仍然不能解決它。 – TechnoJab

+0

可能還有別的東西。是否可以添加一些日誌記錄來查看正在發生的事情,還是會導致問題消失? –

+0

您也可以嘗試僅刪除一個或其他已同步的塊,以查看它在put/get的一側是否僅存在問題。 –