2016-08-06 109 views
2

是否有Java中的優先級隊列,其行爲類似於LinkedBlockingQueue?BoundedPriorityBlockingQueue - 線程安全,阻塞和有界?

PriorityBlockingQueue沒有被阻塞,因爲它是無界的。

+0

你的....單詞不會有多大意義 - 如果你使用阻塞函數,'PriorityBlockignQueue'將**總是**阻塞...如果你希望它被限制,只需創建你自己的類,它擴展了PriorityBlockingQueue和' @ Override'阻塞函數 - 檢查你的限制並調用'super'方法,否則阻止 – specializt

+0

@specializt:實際上,這似乎沒有多大意義。它是**無限的**,所以它不會阻塞'put(E)'。從JavaDoc中:「由於隊列是無界的,這個方法永遠不會阻塞」。此外,不會簡單地覆蓋併發集合。 – beatngu13

+0

問題是,'put'沒有阻塞,因爲隊列是無界的。我想知道是否有類似的實施,其中'put'被阻止 – barracuda317

回答

1

您可以從谷歌番石榴嘗試MinMaxPriorityQueue並設置爲下一個最大尺寸:

Queue<User> users = Queues.synchronizedQueue(
    MinMaxPriorityQueue.orderedBy(userComparator) 
     .maximumSize(1000) 
     .create() 
); 

注意:作爲MinMaxPriorityQueue不是線程安全的,你需要使用裝飾Queues.synchronizedQueue(Queue)允許使它線程安全。

,因爲你需要一個BlockingQueue你將必須實現自己的裝飾這是不難實現的。

這是應該的樣子:

public class SynchronizedBlockingQueue implements BlockingQueue { 

    private final BlockingQueue queue; 

    public SynchronizedBlockingQueue(BlockingQueue queue) { 
     this.queue = queue; 
    } 

    @Override 
    public synchronized boolean add(final Object o) { 
     return this.queue.add(o); 
    } 

    @Override 
    public synchronized boolean offer(final Object o) { 
     return this.offer(o); 
    } 
    ... 
} 

然後代碼來創建BlockingQueue將是:

BlockingQueue<User> users = new SynchronizedBlockingQueue(
    MinMaxPriorityQueue.orderedBy(userComparator) 
     .maximumSize(1000) 
     .create() 
); 
+0

據我所知,這個實現不是線程安全的。有很多線程與兩個隊列進行交互(put和take)。如果實施不是線程安全的,我面臨的困難是什麼? – barracuda317

0

如果您不需要一個完全成熟的BlockingQueue接口實現再你可以使用Semaphore和類似的東西(在Kotlin中):

interface BlockingBag<E: Any> { 
    @Throws(InterruptedException::class) 
    fun put(element: E) 
    @Throws(InterruptedException::class) 
    fun take(): E 
} 
class BlockingPriorityBag<E: Any>(val capacity: Int) : BlockingBag<E> { 
    init { 
     require(capacity >= 1) { "$capacity must be 1 or greater" } 
    } 
    private val queue = PriorityBlockingQueue<E>() 
    private val semaphore = Semaphore(capacity) 
    override fun take(): E { 
     val item = queue.take() 
     semaphore.release() 
     return item 
    } 
    override fun put(element: E) { 
     semaphore.acquire() 
     queue.put(element) 
    } 
}