2016-05-23 149 views
2

我想向ArrayBlockingQueue添加功能,特別是我希望隊列只保留唯一的元素,即如果隊列中已包含條目,則不列入條目。 由於我期望的功能與JCIP項目4.4中Vector的擴展相同,因此我嘗試使用其中的方法來實現它。向ArrayBlockingQueue添加功能

  • 實現通過擴展不起作用,因爲ArrayBlockingQueue使用包專用的ReentrantLock,從而延伸類我無法得到它的參考實現其相互排斥。即使它工作,這也是一種脆弱的方法。
  • 通過客戶端鎖定實現不起作用,因爲沒有客戶端鎖定支持。
  • 實現由組成似乎是要走的路在第一,生產代碼,比如

    public class DistinctBlockingQueue<E> implements BlockingQueue<E> { 
        private final BlockingQueue<E> backingQueue; 
    
        public DistinctBlockingQueue(BlockingQueue<E> backingQueue) { 
         this.backingQueue = backingQueue; 
        } 
    
        @Override 
        public synchronized boolean offer(E e) { 
         if (backingQueue.contains(e)) { 
          return false; 
         } 
    
         return backingQueue.offer(e); 
        } 
    
        @Override 
        public synchronized E take() throws InterruptedException { 
         return backingQueue.take(); 
        } 
    
        // Other methods... 
    } 
    

    不幸的是,撰寫的ArrayBlockingQueue時,這種方法產生在以下簡單的場景死鎖:

    1. 線程A調用take()並獲取同步的鎖和ArrayBlockingQueue的內部鎖。
    2. 線程A在看到隊列爲空並釋放ArrayBlockingQueue的內部鎖時發生阻塞。
    3. 線程B使用一個元素調用offer(),但無法獲取同步的鎖,永遠阻塞。

我的問題是,怎麼可能不重寫ArrayBlockingQueue來實現這一功能?

回答

3

也許一個簡單而快速的解決辦法是使用java.util.concurrent.ConcurrentMap

import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ConcurrentMap; 

public class DistinctBlockingQueue<E> implements BlockingQueue<E> { 

    private final BlockingQueue<E> backingQueue; 
    private final ConcurrentMap<E, Boolean> elements = new ConcurrentHashMap<>(); 

    public DistinctBlockingQueue(BlockingQueue<E> backingQueue) { 
     this.backingQueue = backingQueue; 
    } 

    @Override 
    public boolean offer(E e) { 
     boolean[] add = {false}; 
     elements.computeIfAbsent(e, k -> add[0] = true); 
     return add[0] && backingQueue.offer(e); 
    } 

    @Override 
    public E take() throws InterruptedException { 
     E e = backingQueue.take(); 
     elements.remove(e); 
     return e; 
    } 

    // Other methods 

} 

注意,沒有必要爲同步

編輯

java.util.concurrent.ConcurrentHashMap文檔說:

/** 
* If the specified key is not already associated with a value, 
* attempts to compute its value using the given mapping function 
* and enters it into this map unless {@code null}. The entire 
* method invocation is performed atomically, so the function is 
* applied at most once per key. Some attempted update operations 
* on this map by other threads may be blocked while computation 
* is in progress, so the computation should be short and simple, 
* and must not attempt to update any other mappings of this map. 
* 
* @param key key with which the specified value is to be associated 
* @param mappingFunction the function to compute a value 
* @return the current (existing or computed) value associated with 
*   the specified key, or null if the computed value is null 
* @throws NullPointerException if the specified key or mappingFunction 
*   is null 
* @throws IllegalStateException if the computation detectably 
*   attempts a recursive update to this map that would 
*   otherwise never complete 
* @throws RuntimeException or Error if the mappingFunction does so, 
*   in which case the mapping is left unestablished 
*/ 
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) { 
    ... 
} 

我已經添加了一些額外的檢查:

import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ConcurrentMap; 

public class DistinctBlockingQueue<E> implements BlockingQueue<E> { 

    private final BlockingQueue<E> backingQueue; 
    private final ConcurrentMap<E, Boolean> elements = new ConcurrentHashMap<>(); 

    public DistinctBlockingQueue(BlockingQueue<E> backingQueue) { 
     this.backingQueue = backingQueue; 
    } 

    @Override 
    public boolean offer(E e) { 
     boolean[] add = {false}; 
     elements.computeIfAbsent(e, k -> add[0] = true); 
     if (add[0]) { 
      // make sure that the element was added to the queue, 
      // otherwise we must remove it from the map 
      if (backingQueue.offer(e)) { 
       return true; 
      } 
      elements.remove(e); 
     } 
     return false; 
    } 

    @Override 
    public E take() throws InterruptedException { 
     E e = backingQueue.take(); 
     elements.remove(e); 
     return e; 
    } 

    @Override 
    public String toString() { 
     return backingQueue.toString(); 
    } 

    // Other methods 

} 

和...讓我們做一些併發測試:

BlockingQueue<String> queue = new DistinctBlockingQueue<>(new ArrayBlockingQueue<>(100)); 

int n = 1000; 
ExecutorService producerService = Executors.newFixedThreadPool(n); 

Callable<Void> producer =() -> { 
    queue.offer("a"); 
    return null; 
}; 

producerService.invokeAll(IntStream.range(0, n).mapToObj(i -> producer).collect(Collectors.toList())); 
producerService.shutdown(); 

System.out.println(queue); // prints [a] 
+0

感謝您的回答,但您的解決方案並不好。兩個線程嘗試提供相同的項目時出現問題。假設第一個線程到達它的return語句並獲得上下文切換,那麼第二個線程開始運行,當它到達它的返回語句時,e將被提供,然後當第一個線程被重新調度時,e將被再次提供。我們必須確保地圖和隊列不會在缺席檢查和報價之間進行修改,這就是爲什麼我們必須使用鎖定,或者更聰明的手段來同步。 –

+1

@DLevant這絕不會發生。 'computeIfAbsent'完成訣竅...查看更新。 – FaNaJ

+0

在第一個線程的add [0]被設置爲true之後,沒有什麼能夠防止另一個線程被調度,在這種情況下,可以添加兩次相同的元素。當涉及到併發問題時,這些測試幾乎無法證明。嘗試添加諸如notify()之類的東西;等待(1000);之前如果(添加[0])在你的代碼中,我相信你應該看看我在說什麼。 –

1

我找到了我的問題的部分答案。報價操作不是我想要的,但是隊列是不同的。

public class DistinctBlockingQueue<E> implements BlockingQueue<E> { 
    private final BlockingQueue<E> backingQueue; 
    private final Set<E> entriesSet = ConcurrentHashMap.newKeySet(); 

    public DistinctBlockingQueue(BlockingQueue<E> backingQueue) { 
     this.backingQueue = backingQueue; 
     entriesSet.addAll(backingQueue); 
    } 

    @Override 
    public boolean offer(E e) { 
     if (!entriesSet.add(e)) 
      return false; 

     boolean added = backingQueue.offer(e); 
     if (!added) { 
      entriesSet.remove(e); 
     } 

     return added; 
    } 

    @Override 
    public E take() throws InterruptedException { 
     E e = backingQueue.take(); 
     entriesSet.remove(e); 

     return e; 
    } 

    // Other methods... 
} 

額外的集合不是問題,因爲我會想要使用一個以獲得合理的性能。然而,我可以想到這個實現的一個問題,如果它與有界的隊列實現(例如ArrayBlockingQueue)一起使用,那麼該集合將不受限制,因此當有很多提供阻止。

這個解決方案劃分了一個顯然應該是原子的操作,所以我高度懷疑應該存在我忽略的其他問題。