維護活動線程的計數。
public class ThreadCounter {
public static final AtomicInteger threadCounter = new AtomicInteger(N);
public static final AtomicInteger queueCounter = new AtomicInteger(0);
public static final Object poisonPill = new Object();
public static volatile boolean cancel = false; // or use a final AomticBoolean instead
}
你線程的輪詢循環應該像下面的(我假設你使用的是BlockingQueue
)
while(!ThreadCounter.cancel) {
int threadCount = ThreadCounter.threadCounter.decrementAndGet(); // decrement before blocking
if(threadCount == 0 && ThreadCounter.queueCounter.get() == 0) {
ThreadCounter.cancel = true;
queue.offer(ThreadCounter.poisonPill);
} else {
Object obj = queue.take();
ThreadCounter.threadCounter.incrementAndGet(); // increment when the thread is no longer blocking
ThreadCounter.queueCounter.decrementAndGet();
if(obj == ThreadCounter.poisonPill) {
queue.offer(obj); // send the poison pill back through the queue so the other threads can read it
continue;
}
}
}
如果一個線程要在BlockingQueue
塊則遞減櫃檯;如果所有線程已經在隊列中等待(意思是counter == 0
),那麼最後一個線程將cancel
設置爲true,然後通過隊列發送毒丸以喚醒其他線程;每個線程都會看到毒丸,通過隊列將其返回以喚醒剩餘的線程,然後在看到cancel
設置爲true時退出循環。
編輯:我已經加入了queueCounter
維護對象的數量的計數在隊列中刪除的數據比賽(很明顯,你還需要增加一個queueCounter.incrementAndGet()
通話,無論你要添加的對象到隊列)。這工作如下:如果threadCount == 0
,但queueCount != 0
,則這意味着一個線程剛剛從隊列中刪除了一個項目,但尚未調用threadCount.getAndIncrement
,因此取消變量是而不是設置爲true。 threadCount.getAndIncrement
呼叫在queueCount.getAndDecrement
呼叫之前很重要,否則您仍然會有數據競爭。不管你調用queueCount.getAndIncrement
的什麼順序,你都不會將它與threadCount.getAndDecrement
的調用交錯(後者將在循環結束時被調用,前者將在循環開始時被調用)。
請注意,您不能只使用queueCount
來確定何時結束進程,因爲線程可能仍然處於活動狀態,但尚未將任何數據放入隊列中 - 換句話說,queueCount
將爲零,但會一旦線程完成當前的迭代,它將不爲零。
而不是通過隊列重複發送poisonPill
,而是取消線程發送(N-1)poisonPills
通過隊列。如果你使用不同的隊列使用這種方法,請謹慎,因爲某些隊列(例如亞馬遜的簡單隊列服務)可能會返回多個項目的等效方法,在這種情況下,您需要重複發送poisonPill
以確保一切都關閉了。
此外,而是採用了while(!cancel)
循環,你可以使用一個while(true)
循環,並打破當環路檢測poisonPill
我想取消應該是波動? – marcorossi
當一個線程位於對queue.take()和incrementAndGet()的調用之間時,會發生什麼,而另一個線程正在計算count == 0和queue.isEmpty()條件? – marcorossi
@marcorossi yeah'cancel'應該是易變的,因此所有線程都可以看到變化。例如,所有線程都可以看到最近寫入「cancel」字段,它保證讀寫操作是原子性的。已編輯答案。 – adamjmarkham