我有一個單線程生產者,它創建一些任務對象,然後將其添加到ArrayBlockingQueue
(它是固定大小)。阻塞隊列和多線程消費者,如何知道何時停止
我也啓動了一個多線程的使用者。這是作爲固定線程池(Executors.newFixedThreadPool(threadCount);
)生成的。然後,我將一些ConsumerWorker實例提交給此ThreadPool,每個ConsumerWorker都具有對上述ArrayBlockingQueue實例的引用。
每個這樣的工人都會在隊列上做一個take()
並處理任務。
我的問題是,有什麼最好的方式讓工人知道什麼時候不會有任何工作要做。換句話說,我如何告訴工人生產者已經完成加入隊列,並且從現在開始,每個工人在看到隊列是空的時候應該停下來。
我現在得到的是一個安裝程序,其中我的Producer使用一個回調進行初始化,當他完成作業(將東西添加到隊列中)時觸發回調。我還保留了我創建並提交給ThreadPool的所有ConsumerWorkers的列表。當Producer Callback告訴我製作人完成時,我可以告訴每個工作人員。在這一點上,他們應該簡單地繼續檢查隊列是否爲空,並且當它變爲空時,它們應該停止,從而允許我正常關閉下ExecutorService線程池。這件事情是這樣
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning || !inputQueue.isEmpty()) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}
}
這裏的問題是,我有一個明顯的競爭條件,其中有時製片人將完成,信號,它和ConsumerWorkers將停止之前消耗在隊列中的一切。
我的問題是什麼是最好的方式來同步這個,以便它的一切正常?我應該同步它檢查生產者是否正在運行的整個部分,以及隊列是否爲空,並且從隊列中取一些東西(在隊列對象上)?我是否應該同步ConsumerWorker實例上的isRunning
布爾值的更新?任何其他建議?
UPDATE,這裏的工作實現,我已經結束了使用:
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private final static Produced POISON = new Produced(-1);
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(true) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Produced queueElement = inputQueue.take();
Thread.sleep(new Random().nextInt(100));
if(queueElement==POISON) {
break;
}
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Consumer "+Thread.currentThread().getName()+" END");
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
try {
inputQueue.put(POISON);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
這是由以下JohnVint的答案很大程度上鼓舞,只有一些小的修改。
===更新歸因於@ vendhan的評論。
謝謝你的觀察。你是對的,這個問題中的第一部分代碼有(其他問題)while(isRunning || !inputQueue.isEmpty())
沒有意義的代碼。
在我的實際最終實施中,我做了一些更接近你的建議,即將「||」 (或)與「& &」(和),在這種意義上,每個工人(消費者)現在只檢查他從列表中獲得的元素是否是毒丸,如果停止(理論上我們可以說工人必須運行並且隊列不能爲空)。
一個executorService已經有一個隊列,所以你不需要另一個隊列。您可以使用shutdown()啓動整個執行程序服務。 –
@PeterLawrey對不起,但我不明白你的評論... –
作爲一個ExecutorService已經有一個隊列,你可以添加任務,它不需要額外的隊列,也不需要工作了解如何阻止它們,因爲這已經實施。 –