也許一個簡單而快速的解決辦法是使用java.util.concurrent.ConcurrentMap
:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class DistinctBlockingQueue<E> implements BlockingQueue<E> {
private final BlockingQueue<E> backingQueue;
private final ConcurrentMap<E, Boolean> elements = new ConcurrentHashMap<>();
public DistinctBlockingQueue(BlockingQueue<E> backingQueue) {
this.backingQueue = backingQueue;
}
@Override
public boolean offer(E e) {
boolean[] add = {false};
elements.computeIfAbsent(e, k -> add[0] = true);
return add[0] && backingQueue.offer(e);
}
@Override
public E take() throws InterruptedException {
E e = backingQueue.take();
elements.remove(e);
return e;
}
// Other methods
}
注意,沒有必要爲同步。
編輯:
在java.util.concurrent.ConcurrentHashMap
文檔說:
/**
* If the specified key is not already associated with a value,
* attempts to compute its value using the given mapping function
* and enters it into this map unless {@code null}. The entire
* method invocation is performed atomically, so the function is
* applied at most once per key. Some attempted update operations
* on this map by other threads may be blocked while computation
* is in progress, so the computation should be short and simple,
* and must not attempt to update any other mappings of this map.
*
* @param key key with which the specified value is to be associated
* @param mappingFunction the function to compute a value
* @return the current (existing or computed) value associated with
* the specified key, or null if the computed value is null
* @throws NullPointerException if the specified key or mappingFunction
* is null
* @throws IllegalStateException if the computation detectably
* attempts a recursive update to this map that would
* otherwise never complete
* @throws RuntimeException or Error if the mappingFunction does so,
* in which case the mapping is left unestablished
*/
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
...
}
我已經添加了一些額外的檢查:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class DistinctBlockingQueue<E> implements BlockingQueue<E> {
private final BlockingQueue<E> backingQueue;
private final ConcurrentMap<E, Boolean> elements = new ConcurrentHashMap<>();
public DistinctBlockingQueue(BlockingQueue<E> backingQueue) {
this.backingQueue = backingQueue;
}
@Override
public boolean offer(E e) {
boolean[] add = {false};
elements.computeIfAbsent(e, k -> add[0] = true);
if (add[0]) {
// make sure that the element was added to the queue,
// otherwise we must remove it from the map
if (backingQueue.offer(e)) {
return true;
}
elements.remove(e);
}
return false;
}
@Override
public E take() throws InterruptedException {
E e = backingQueue.take();
elements.remove(e);
return e;
}
@Override
public String toString() {
return backingQueue.toString();
}
// Other methods
}
和...讓我們做一些併發測試:
BlockingQueue<String> queue = new DistinctBlockingQueue<>(new ArrayBlockingQueue<>(100));
int n = 1000;
ExecutorService producerService = Executors.newFixedThreadPool(n);
Callable<Void> producer =() -> {
queue.offer("a");
return null;
};
producerService.invokeAll(IntStream.range(0, n).mapToObj(i -> producer).collect(Collectors.toList()));
producerService.shutdown();
System.out.println(queue); // prints [a]
感謝您的回答,但您的解決方案並不好。兩個線程嘗試提供相同的項目時出現問題。假設第一個線程到達它的return語句並獲得上下文切換,那麼第二個線程開始運行,當它到達它的返回語句時,e將被提供,然後當第一個線程被重新調度時,e將被再次提供。我們必須確保地圖和隊列不會在缺席檢查和報價之間進行修改,這就是爲什麼我們必須使用鎖定,或者更聰明的手段來同步。 –
@DLevant這絕不會發生。 'computeIfAbsent'完成訣竅...查看更新。 – FaNaJ
在第一個線程的add [0]被設置爲true之後,沒有什麼能夠防止另一個線程被調度,在這種情況下,可以添加兩次相同的元素。當涉及到併發問題時,這些測試幾乎無法證明。嘗試添加諸如notify()之類的東西;等待(1000);之前如果(添加[0])在你的代碼中,我相信你應該看看我在說什麼。 –