2012-01-23 30 views
59

我有一個單線程生產者,它創建一些任務對象,然後將其添加到ArrayBlockingQueue(它是固定大小)。阻塞隊列和多線程消費者,如何知道何時停止

我也啓動了一個多線程的使用者。這是作爲固定線程池(Executors.newFixedThreadPool(threadCount);)生成的。然後,我將一些ConsumerWorker實例提交給此ThreadPool,每個ConsumerWorker都具有對上述ArrayBlockingQueue實例的引用。

每個這樣的工人都會在隊列上做一個take()並處理任務。

我的問題是,有什麼最好的方式讓工人知道什麼時候不會有任何工作要做。換句話說,我如何告訴工人生產者已經完成加入隊列,並且從現在開始,每個工人在看到隊列是空的時候應該停下來。

我現在得到的是一個安裝程序,其中我的Producer使用一個回調進行初始化,當他完成作業(將東西添加到隊列中)時觸發回調。我還保留了我創建並提交給ThreadPool的所有ConsumerWorkers的列表。當Producer Callback告訴我製作人完成時,我可以告訴每個工作人員。在這一點上,他們應該簡單地繼續檢查隊列是否爲空,並且當它變爲空時,它們應該停止,從而允許我正常關閉下ExecutorService線程池。這件事情是這樣

public class ConsumerWorker implements Runnable{ 

private BlockingQueue<Produced> inputQueue; 
private volatile boolean isRunning = true; 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) { 
    this.inputQueue = inputQueue; 
} 

@Override 
public void run() { 
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty: 
    while(isRunning || !inputQueue.isEmpty()) { 
     System.out.println("Consumer "+Thread.currentThread().getName()+" START"); 
     try { 
      Object queueElement = inputQueue.take(); 
      //process queueElement 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 
} 

//this is used to signal from the main thread that he producer has finished adding stuff to the queue 
public void setRunning(boolean isRunning) { 
    this.isRunning = isRunning; 
} 

}

這裏的問題是,我有一個明顯的競爭條件,其中有時製片人將完成,信號,它和ConsumerWorkers將停止之前消耗在隊列中的一切。

我的問題是什麼是最好的方式來同步這個,以便它的一切正常?我應該同步它檢查生產者是否正在運行的整個部分,以及隊列是否爲空,並且從隊列中取一些東西(在隊列對象上)?我是否應該同步ConsumerWorker實例上的isRunning布爾值的更新?任何其他建議?

UPDATE,這裏的工作實現,我已經結束了使用:

public class ConsumerWorker implements Runnable{ 

private BlockingQueue<Produced> inputQueue; 

private final static Produced POISON = new Produced(-1); 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) { 
    this.inputQueue = inputQueue; 
} 

@Override 
public void run() { 
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty: 
    while(true) { 
     System.out.println("Consumer "+Thread.currentThread().getName()+" START"); 
     try { 
      Produced queueElement = inputQueue.take(); 
      Thread.sleep(new Random().nextInt(100)); 
      if(queueElement==POISON) { 
       break; 
      } 
      //process queueElement 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
     System.out.println("Consumer "+Thread.currentThread().getName()+" END"); 
    } 
} 

//this is used to signal from the main thread that he producer has finished adding stuff to the queue 
public void stopRunning() { 
    try { 
     inputQueue.put(POISON); 
    } catch (InterruptedException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 
} 

}

這是由以下JohnVint的答案很大程度上鼓舞,只有一些小的修改。

===更新歸因於@ vendhan的評論。

謝謝你的觀察。你是對的,這個問題中的第一部分代碼有(其他問題)while(isRunning || !inputQueue.isEmpty())沒有意義的代碼。

在我的實際最終實施中,我做了一些更接近你的建議,即將「||」 (或)與「& &」(和),在這種意義上,每個工人(消費者)現在只檢查他從列表中獲得的元素是否是毒丸,如果停止(理論上我們可以說工人必須運行並且隊列不能爲空)。

+1

一個executorService已經有一個隊列,所以你不需要另一個隊列。您可以使用shutdown()啓動整個執行程序服務。 –

+0

@PeterLawrey對不起,但我不明白你的評論... –

+8

作爲一個ExecutorService已經有一個隊列,你可以添加任務,它不需要額外的隊列,也不需要工作了解如何阻止它們,因爲這已經實施。 –

回答

78

你應該從隊列中繼續到take()。您可以使用毒丸告訴工人停止。例如:

private final Object POISON_PILL = new Object(); 

@Override 
public void run() { 
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty: 
    while(isRunning) { 
     System.out.println("Consumer "+Thread.currentThread().getName()+" START"); 
     try { 
      Object queueElement = inputQueue.take(); 
      if(queueElement == POISON_PILL) { 
       inputQueue.add(POISON_PILL);//notify other threads to stop 
       return; 
      } 
      //process queueElement 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 
} 

//this is used to signal from the main thread that he producer has finished adding stuff to the queue 
public void finish() { 
    //you can also clear here if you wanted 
    isRunning = false; 
    inputQueue.add(POISON_PILL); 
} 
+37

+1對於我的科技詞典只有一個術語更豐富了「毒丸」 – Kiril

+2

我試過這個,它工作正常,除了一個litle修改:我必須做inputQueue .put(POISON_PILL);不提供,因爲如果我提供()並且隊列在那個時候處於滿負荷狀態(即工人們真的很懶),它不會添加POISON PILL元素,請問這是否正確,還是我說愚蠢的事情? –

+0

@AndreiBodnarescu你說的沒錯,我以爲我全部都是t下襬但只有固定一個:) –

0

有一些可以使用的策略,但一個簡單的一個是有任務的子類,標誌着這項工作的結束。製作人不直接發送此信號。相反,它會排隊該任務子類的一個實例。當你的一個消費者拉出這個任務並執行它時,會導致信號被髮送。

14

我派工人特殊的工作包的信號,他們應該關閉:

public class ConsumerWorker implements Runnable{ 

private static final Produced DONE = new Produced(); 

private BlockingQueue<Produced> inputQueue; 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) { 
    this.inputQueue = inputQueue; 
} 

@Override 
public void run() { 
    for (;;) { 
     try { 
      Produced item = inputQueue.take(); 
      if (item == DONE) { 
       inputQueue.add(item); // keep in the queue so all workers stop 
       break; 
      } 
      // process `item` 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 
} 

}

要停止的工人,只需添加ConsumerWorker.DONE到隊列中。

1

在您嘗試從隊列中檢索元素的代碼塊中,使用poll(time,unit)而不是take()

try { 
    Object queueElement = inputQueue.poll(timeout,unit); 
    //process queueElement   
} catch (InterruptedException e) { 
     if(!isRunning && queue.isEmpty()) 
     return ; 
} 

通過指定超時適當的值,可以確保線程不會繼續阻止的情況下,存在的

  1. isRunning一個不幸的序列是真的
  2. 隊列爲空,所以線程回車阻塞等待(如果使用take()
  3. isRunning設置爲假
+0

是的,這是我嘗試過的選擇之一,但它有一些缺點:首先,我做了很多不必要的調用poll(),然後當這樣做時(!isRunning && queue.isEmpty())加上隊列中的東西我必須將它們全部同步到一個同步塊,這是多餘的,因爲BlockingQueue已經自己處理所有這些。 –

+0

爲了避免第一個問題,爲什麼不只是將'isRunning'設置爲false的線程在等待'take()'調用的線程上發送中斷? catch塊仍然以同樣的方式工作 - 這不需要單獨的同步 - 除非你打算將isRunning'從false設置爲true .. – Bhaskar

+0

我也嘗試過,向你的線程發送中斷。問題是,如果你的線程是由一個ExecutorService(像一個FixedThreadPool)啓動的,當你執行executorService.shutDown()時,你的所有線程都將收到InterruptedException,這將使它們在任務中途停下來(因爲它們現在被裝備InterruptedException作爲阻止者)。此外,通過拋出的例外進行溝通並不是非常有效的。 –

0

我不得不使用多線程生產者和多線程使用者。 我結束了一個Scheduler -- N Producers -- M Consumers方案,每兩個通過一個隊列(總共兩個隊列)進行通信。調度程序用請求產生數據填充第一個隊列,然後用N個「毒丸」填充它。有一個活躍生產者的計數器(原子詮釋),最後一個生產者收到最後一個毒丸向消費者隊列發送M毒藥。

0

我們不能用CountDownLatch來做到這一點,其中大小是生產者中記錄的數量。並且每個消費者將在處理完成後記錄countDown。當所有任務完成時,它將穿過awaits()方法。然後停止所有的消費者。隨着所有記錄的處理。

相關問題