2013-05-16 28 views
5

我有N個共享要計算的元素隊列的工作人員。在每次迭代中,每個工作人員從隊列中移除一個元素,並可以生成更多要計算的元素,這些元素將放入同一隊列中。基本上,每個生產者也是一個消費者。當隊列中沒有元素並且所有工作人員已經完成計算當前元素(因此不會產生更多要計算的元素)時,計算結束。我想避免一個調度員/協調員,所以工人應該協調。允許工人查明停止條件是否有效,並因此代表其他人停止計算的最佳模式是什麼?具有停止條件的Java生產者 - 消費者

例如,如果所有線程只想做這個循環中,當元件被所有計算,這將導致被永遠阻斷所有線程:

while (true) { 
    element = queue.poll(); 
    newElements[] = compute(element); 
    if (newElements.length > 0) { 
     queue.addAll(newElements); 
    } 
} 

回答

6

維護活動線程的計數。

public class ThreadCounter { 
    public static final AtomicInteger threadCounter = new AtomicInteger(N); 
    public static final AtomicInteger queueCounter = new AtomicInteger(0); 
    public static final Object poisonPill = new Object(); 
    public static volatile boolean cancel = false; // or use a final AomticBoolean instead 
} 

你線程的輪詢循環應該像下面的(我假設你使用的是BlockingQueue

while(!ThreadCounter.cancel) { 
    int threadCount = ThreadCounter.threadCounter.decrementAndGet(); // decrement before blocking 
    if(threadCount == 0 && ThreadCounter.queueCounter.get() == 0) { 
     ThreadCounter.cancel = true; 
     queue.offer(ThreadCounter.poisonPill); 
    } else { 
     Object obj = queue.take(); 
     ThreadCounter.threadCounter.incrementAndGet(); // increment when the thread is no longer blocking 
     ThreadCounter.queueCounter.decrementAndGet(); 
     if(obj == ThreadCounter.poisonPill) { 
      queue.offer(obj); // send the poison pill back through the queue so the other threads can read it 
      continue; 
     } 
    } 
} 

如果一個線程要在BlockingQueue塊則遞減櫃檯;如果所有線程已經在隊列中等待(意思是counter == 0),那麼最後一個線程將cancel設置爲true,然後通過隊列發送毒丸以喚醒其他線程;每個線程都會看到毒丸,通過隊列將其返回以喚醒剩餘的線程,然後在看到cancel設置爲true時退出循環。

編輯:我已經加入了queueCounter維護對象的數量的計數在隊列中刪除的數據比賽(很明顯,你還需要增加一個queueCounter.incrementAndGet()通話,無論你要添加的對象到隊列)。這工作如下:如果threadCount == 0,但queueCount != 0,則這意味着一個線程剛剛從隊列中刪除了一個項目,但尚未調用threadCount.getAndIncrement,因此取消變量是而不是設置爲true。 threadCount.getAndIncrement呼叫在queueCount.getAndDecrement呼叫之前很重要,否則您仍然會有數據競爭。不管你調用queueCount.getAndIncrement的什麼順序,你都不會將它與threadCount.getAndDecrement的調用交錯(後者將在循環結束時被調用,前者將在循環開始時被調用)。

請注意,您不能只使用queueCount來確定何時結束進程,因爲線程可能仍然處於活動狀態,但尚未將任何數據放入隊列中 - 換句話說,queueCount將爲零,但會一旦線程完成當前的迭代,它將不爲零。

而不是通過隊列重複發送poisonPill,而是取消線程發送(N-1)poisonPills通過隊列。如果你使用不同的隊列使用這種方法,請謹慎,因爲某些隊列(例如亞馬遜的簡單隊列服務)可能會返回多個項目的等效方法,在這種情況下,您需要重複發送poisonPill以確保一切都關閉了。

此外,而是採用了while(!cancel)循環,你可以使用一個while(true)循環,並打破當環路檢測poisonPill

+0

我想取消應該是波動? – marcorossi

+0

當一個線程位於對queue.take()和incrementAndGet()的調用之間時,會發生什麼,而另一個線程正在計算count == 0和queue.isEmpty()條件? – marcorossi

+0

@marcorossi yeah'cancel'應該是易變的,因此所有線程都可以看到變化。例如,所有線程都可以看到最近寫入「cancel」字段,它保證讀寫操作是原子性的。已編輯答案。 – adamjmarkham