2016-05-25 68 views
0

我正在嘗試創建一個SingleBlockingQueue<T>同步器,它允許一個線程將offer()一個元素給它,而另一個線程將它take()它。一次只有一個T元素被保存在SingleBlockingQueue<T>內部,並且如果前一個元素正在等待其線程爲take(),則推送線程在offer()上被阻止。推動線程將繼續推送物品,直到它調用setComplete(),並且線程將繼續呼叫take()isComplete()爲假。如果線程正在等待某個元素,它將被阻塞。創建SingleBlockingQueue同步器

這是我到目前爲止的同步器。

import java.util.concurrent.atomic.AtomicBoolean; 

public final class SingleBlockingQueue<T> { 

    private volatile T value; 
    private final AtomicBoolean isComplete = new AtomicBoolean(false); 
    private final AtomicBoolean isPresent = new AtomicBoolean(false); 

    public void offer(T value) throws InterruptedException { 
     while (isPresent.get()) { 
      this.wait(); 
     } 
     this.value = value; 
     synchronized(this) { 
      this.notifyAll(); 
     } 
    } 
    public boolean isComplete() { 
     return !isPresent.get() && isComplete.get(); 
    } 
    public void setComplete() { 
     isComplete.set(true); 
    } 
    public T take() throws InterruptedException { 
     while (!isPresent.get()) { 
      this.wait(); 
     } 
     T returnValue = value; 
     isPresent.set(false); 
     synchronized(this) { 
      this.notifyAll(); 
     } 
     return returnValue; 
    } 
} 

這裏是科特林

val queue = SingleBlockingQueue<Int>() 

    thread { 
     for (i in 1..1000) { 
      queue.offer(i) 
     } 
     queue.setComplete() 
    } 

    thread { 
     while (!queue.isComplete) { 
      println(queue.take()) 
     } 
    } 

    Thread.sleep(100000) 

不過,我得到一個錯誤的使用例子,我在我頭上了一下,在這一點上。感謝RxJava,我很久沒有做同步器。我究竟做錯了什麼?

Exception in thread "Thread-1" java.lang.IllegalMonitorStateException 
    at java.lang.Object.wait(Native Method) 
    at java.lang.Object.wait(Object.java:502) 
    at com.swa.rm.common.util.SingleBlockingQueue.take(SingleBlockingQueue.java:29) 
    at RxOperatorTest$testSingleBlockingQueue$2.invoke(RxOperatorTest.kt:33) 
    at RxOperatorTest$testSingleBlockingQueue$2.invoke(RxOperatorTest.kt:8) 
    at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:18) 
+3

不應該等待()在同步塊內調用嗎? –

+0

我試過,但有同樣的問題。也許我需要創建兩個獨立的鎖?另外,對於我之前寫過的同步器沒有這樣做,也沒有任何問題。 https://github.com/thomasnield/tom-sync/blob/master/src/main/java/org/nield/concurrency/BufferedLatch.java – tmn

+0

github代碼給出了同樣的錯誤,如果我直接調用await()[Just嘗試在我當地的日食]。在調用await()之前,您可能會在與該對象關聯的監視器上鎖定一個鎖。 –

回答

1

正如其他人所指出的,您可以使用SynchronousQueue中的現有實現。

如果您想實現自己的功能,則非常接近,只需確保wait()的調用位於​​區塊內。

不幸的是,我相信你的原代碼isComplete()/setComplete()機制是受競爭條件,爲isComplete()返回之後false之前或者甚至在讀線程執行take()setComplete()可以被調用。這可能會掛起閱讀線程。

public final class SingleBlockingQueue<T> { 
    private final Object lock = new Object(); 
    private T value; 
    private boolean present = false; 

    public void offer(T value) throws InterruptedException { 
     synchronized (lock) { 
     while (present) 
      lock.wait(); 
     this.value = value; 
     present = true; 
     lock.notifyAll(); 
     } 
    } 

    public T take() throws InterruptedException { 
     synchronized (lock) { 
     while (!present) 
      lock.wait(); 
     T returnValue = value; 
     value = null; // Should release reference 
     present = false; 
     lock.notifyAll(); 
     return returnValue; 
     } 
    } 
    } 

爲了進行比較,可以更自然地實現這種隊列的基礎上SemaphoreCondition對象。這是一個使用一對信號量來表示空/滿條件的實現。

public final class SingleBlockingQueue<T> { 
    private volatile T value; 
    private final Semaphore full = new Semaphore(0); 
    private final Semaphore empty = new Semaphore(1); 

    public void offer(T value) throws InterruptedException { 
     empty.acquire(); 
     this.value = value; 
     full.release(); 
    } 

    public T take() throws InterruptedException { 
     full.acquire(); 
     T returnValue = value; 
     value = null; // Should release reference 
     empty.release(); 
     return returnValue; 
    } 
    } 
+0

The Semaphore方法非常簡單,謝謝。最後一個問題。我如何安全地溝通沒有更多的項目?我在客戶端使用'AtomicBoolean'正面測試了這個bejeezus,但我仍然很謹慎。 – tmn

+0

這是我在Kotlin的用法。雖然我無法證明它,但我擔心'take()'上的最後一個元素調用可能會被忽略https://gist.github.com/thomasnield/a3f7981ea447e0c049ba5943afa44fb8#file-singleblockingqueue-usage-kt – tmn

+0

哈,證明了我的擔心是正確的。我用'睡眠()'和一個評論顯示了哪裏會出軌。 – tmn

2

你不需要自己實現它,你可以使用SynchronousQueue

參考文獻:

SynchronousQueue javadoc

http://tutorials.jenkov.com/java-util-concurrent/synchronousqueue.html

的的SynchronousQueue類實現了BlockingQueue接口。 閱讀BlockingQueue文本以獲取有關界面的更多信息。

SynchronousQueue是一個內部只能包含單個元素 的隊列。將一個元素插入隊列的線程被阻塞 ,直到另一個線程從隊列中獲取該元素。同樣,如果 線程嘗試獲取元素並且當前沒有元素存在,則線程將被阻塞,直到線程將一個元素插入 隊列。

+1

「SynchronousQueue」的Javadoc描述是誤導性的,它並不具備能力可以存儲一個元素:容量是_zero_。對行爲的描述是關鍵:生產者線程不能僅僅放置()一個元素並離開。任何'q.put(e)'調用都會被阻塞,直到消費者調用q.take()'。這使得「SynchronousQueue」的行爲與OP描述的行爲不同。 (當然,它與OP實際上_wants_是不同的是一個完全不同的問題) –

+0

是的,如果它正在等待'take()'去除單個元素,我想'put()'被阻塞並釋放插槽。 – tmn

+1

實際上,Java8 JDK中的文檔說:「一個同步隊列沒有任何內部容量,甚至不能有一個容量。」(強調,我的。) –

0

只是說明我有一些問題與ResultSet雀躍着由於在RxJava-JDBC框架next()通話時間。我用這個實現去修改了前面給出的答案。

public final class SingleBlockingQueue<T> { 
    private volatile T value; 
    private final Semaphore nextGate = new Semaphore(0); 
    private final Semaphore waitGate = new Semaphore(0); 

    private volatile boolean hasValue = true; 
    private volatile boolean isFirst = true; 

    public void offer(T value) throws InterruptedException { 
     if (isFirst) { 
      nextGate.acquire(); 
      isFirst = false; 
     } 
     this.value = value; 
     waitGate.release(); 
     nextGate.acquire(); 
    } 

    public T take() throws InterruptedException { 
     T returnValue = value; 
     value = null; // Should release reference 
     return returnValue; 
    } 
    public boolean next() throws InterruptedException { 
     nextGate.release(); 
     waitGate.acquire(); 
     return hasValue; 
    } 
    public void setDone() { 
     hasValue = false; 
     waitGate.release(); 
    } 
} 

這是我使用它:翻動RxJava Observable<T>到科特林一個Sequence<T>

import com.github.davidmoten.rx.jdbc.QuerySelect 
import rx.Observable 
import rx.Scheduler 
import rx.lang.kotlin.subscribeWith 
import java.io.Closeable 

class ObservableIterator<T>(
     observable: Observable<T> 
) : Iterator<T>, Closeable { 

    private val queue = SingleBlockingQueue<T>() 

    private val subscription = 
      observable 
        .subscribeWith { 
         onNext { queue.offer(it) } 
         onCompleted { queue.setDone() } 
         onError { queue.setDone() } 
        } 

    override fun hasNext(): Boolean { 
     return queue.next() 
    } 

    override fun next(): T { 
     return queue.take() 
    } 
    override fun close() { 
     subscription.unsubscribe() 
     queue.setDone() 
    } 
} 

fun <T> Observable<T>.asSequence() = ObservableIterator(this).asSequence() 

fun QuerySelect.Builder.asSequence(scheduler: Scheduler) = get { it } 
     .subscribeOn(scheduler) 
     .asSequence() 
+1

請注意,當返回'Sequence'沒有被完全消耗時,對observable的訂閱會永遠在'onNext'中被阻塞。 – Ilya