我正在嘗試創建一個SingleBlockingQueue<T>
同步器,它允許一個線程將offer()
一個元素給它,而另一個線程將它take()
它。一次只有一個T
元素被保存在SingleBlockingQueue<T>
內部,並且如果前一個元素正在等待其線程爲take()
,則推送線程在offer()
上被阻止。推動線程將繼續推送物品,直到它調用setComplete()
,並且線程將繼續呼叫take()
而isComplete()
爲假。如果線程正在等待某個元素,它將被阻塞。創建SingleBlockingQueue同步器
這是我到目前爲止的同步器。
import java.util.concurrent.atomic.AtomicBoolean;
public final class SingleBlockingQueue<T> {
private volatile T value;
private final AtomicBoolean isComplete = new AtomicBoolean(false);
private final AtomicBoolean isPresent = new AtomicBoolean(false);
public void offer(T value) throws InterruptedException {
while (isPresent.get()) {
this.wait();
}
this.value = value;
synchronized(this) {
this.notifyAll();
}
}
public boolean isComplete() {
return !isPresent.get() && isComplete.get();
}
public void setComplete() {
isComplete.set(true);
}
public T take() throws InterruptedException {
while (!isPresent.get()) {
this.wait();
}
T returnValue = value;
isPresent.set(false);
synchronized(this) {
this.notifyAll();
}
return returnValue;
}
}
這裏是科特林
val queue = SingleBlockingQueue<Int>()
thread {
for (i in 1..1000) {
queue.offer(i)
}
queue.setComplete()
}
thread {
while (!queue.isComplete) {
println(queue.take())
}
}
Thread.sleep(100000)
不過,我得到一個錯誤的使用例子,我在我頭上了一下,在這一點上。感謝RxJava,我很久沒有做同步器。我究竟做錯了什麼?
Exception in thread "Thread-1" java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at com.swa.rm.common.util.SingleBlockingQueue.take(SingleBlockingQueue.java:29)
at RxOperatorTest$testSingleBlockingQueue$2.invoke(RxOperatorTest.kt:33)
at RxOperatorTest$testSingleBlockingQueue$2.invoke(RxOperatorTest.kt:8)
at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:18)
不應該等待()在同步塊內調用嗎? –
我試過,但有同樣的問題。也許我需要創建兩個獨立的鎖?另外,對於我之前寫過的同步器沒有這樣做,也沒有任何問題。 https://github.com/thomasnield/tom-sync/blob/master/src/main/java/org/nield/concurrency/BufferedLatch.java – tmn
github代碼給出了同樣的錯誤,如果我直接調用await()[Just嘗試在我當地的日食]。在調用await()之前,您可能會在與該對象關聯的監視器上鎖定一個鎖。 –