2013-10-11 50 views
1

我對併發感到困惑 - 我試圖阻止消費者線程運行,如果生產者已關閉,但如果消費者在take()上被阻塞時遇到問題。我曾嘗試使用布爾標誌添加一個posion藥丸,暫停當前線程,但仍無濟於事。併發問題 - 阻塞隊列

請有人可以幫助建議我去哪裏錯了。謝謝。

public class TestPoisonPill implements Runnable { 
    private BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1); 
    private volatile boolean stopped = false; 

    public void addToQueue(String event) throws InterruptedException{ 
     System.out.println("in add to queue"); 
     if(event != null){ 
      try { 
       queue.put(event); 
      } catch (InterruptedException e) { 
       stopped = true; 
       queue.put("Poison"); 
       System.out.println("Unable to add the event to the queue, order routing processing is stopped"); 
       throw e; 
      } 
     } 
    } 

    @Override 
    public void run() { 
     while(!stopped){ 
      try { 
       if(queue.size() > 0){ 
        String string = queue.take(); 
        System.out.println("taken " + string + "from the queue"); 
       }else{ 
        continue; 
       } 
      } 
      catch (InterruptedException e) { 
       stopped = true; 
      } 
     } 
    } 

    public boolean isStopped(){ 
     return stopped; 
    } 

    protected BlockingQueue<String> getQueue() { 
     return queue; 
    } 

    protected void setBoolean(boolean b){ 
     this.stopped = b; 
    } 

    public static void main(String[] args) throws InterruptedException{ 
     ExecutorService exec = Executors.newSingleThreadExecutor(); 
     final TestPoisonPill t = new TestPoisonPill(); 
     exec.execute(t); 
     ExecutorService exec2 = Executors.newSingleThreadExecutor(); 
     Runnable addTask = new Runnable() { 
      public void run() { 
       while (true) { 
        try { 
         t.addToQueue("hi"); 
         Thread.sleep(100); 
        } catch (InterruptedException ex) { 
         System.out.println("add task interrupted "); 
         t.setBoolean(true); 
         break; 
        } 
       } 
      } 
     }; 
     exec2.execute(addTask); 
     Thread.sleep(1000); 
     exec2.shutdownNow(); 
    } 
} 

回答

3

很困惑併發 - 我試圖從運行如果生產者是關機停止消費者線程但我有問題,如果消費者被擋在採取()

如果您問題在於你的程序沒有停止,我認爲你在第一個ExecutorService上丟失了exec.shutdownNow()。這會打斷你的第一個線程,如果你改變你的循環是這樣的:

while (!stopped && !Thread.currentThread().isInterrupted()) { 

沒有中斷標誌檢查所有中斷將不會被線程看到。中斷只是在線程上設置的標誌。某些方法(如Thread.sleep(...)BlockingQueue.take())在線程中斷但您的客戶正在旋轉且從未呼叫take()時拋出InterruptedException

真的,消費者的旋轉循環是一個非常糟糕的模式。它應該只需撥打queue.take(),然後使用中斷讓您的製片人實際提交毒藥。是這樣的:如果你正在使用適當的中斷

while (!Thread.currentThread().isInterrupted()) { 
    String string; 
    try { 
     string = queue.take(); 
    } catch (InterruptedException e) { 
     break; 
    } 
    // here is where you could check for a poison pill 
    // something like: if (string == STOP_PILL) break; 
    System.out.println("taken " + string + "from the queue"); 
} 

你並不真正需要的stopped標誌。

你提到嘗試過「中毒丸」。對於其他人來說,當你在隊列中放置一個特定的「特殊」對象時,一箇中毒丸是消費者用來知道何時關閉的。像下面這樣的東西應該工作:

private static final String STOP_PILL = "__STOP_PLEASE!!__"; 
... 
// the consumer removes from the queue 
String string = queue.take(); 
// it tests to see if it a pill, == is better than .equals here 
if (string == STOP_PILL) { 
    // the consumer should stop 
    break; 
} 
... 
// to stop the consumer, the producer puts the pill into the queue 
queue.put(STOP_PILL); 

最後,您使用的是2種ExecutorService情況下,您可以輕鬆地使用一個。我想這裏的重點只是打斷其中的一個,但僅供參考。您可以使用一個Executors.newCachedThreadPool()這將創建您需要的線程數。

+0

你好我正在使用一箇中毒丸,我刪除,因爲它沒有工作!我不明白的是 - 爲什麼兩個執行者都需要關閉,因爲run方法肯定應該退出? – Biscuit128

+0

只需添加一些示例代碼@Bincuit128。 – Gray

+0

對不起@餅乾128,我沒有看到你的問題。在將最後一項任務提交給執行程序服務後,您需要關閉該程序。否則,它會永遠等待,看看是否要提交更多任務。 'shutdown()'只停止正在運行的新任務 - 它允許現有任務運行直到完成。 'shutdownNow()'刪除所有等待中的任務並中斷正在運行的任務。 – Gray

0

您從未關閉您的exec執行程序,只有exec2,因此運行您的TestPoisonPill的線程永遠不會中斷。