2012-01-04 27 views
0

我有一種情況,我有2個阻塞隊列。第一個插入一些我執行的任務。每個任務完成後,它會將任務添加到第二個隊列中,並在其中執行。Java BlockingQueue在take()上阻塞,稍微扭曲

所以我的第一個隊列很簡單:我只是檢查,以確保它不是空的,執行,否則我打斷():

public void run() { 
    try { 
     if (taskQueue1.isEmpty()) { 
      SomeTask task = taskQueue1.poll(); 
      doTask(task); 
      taskQueue2.add(task); 
     } 
     else { 
      Thread.currentThread().interrupt(); 
     } 
    } 

    catch (InterruptedException ex) { 
     ex.printStackTrace(); 
    } 
} 

第二個我這樣做,正如你所知道的,不工作:

public void run() { 
    try { 
     SomeTask2 task2 = taskQueue2.take(); 
     doTask(task2); 
    } 

    catch (InterruptedException ex) { 

    } 
    Thread.currentThread().interrupt(); 

} 

你將如何解決這個問題,使得第二的BlockingQueue不上取(塊),但完成,只有當它知道沒有添加更多的項目。如果第二個線程可能會看到第一個阻塞隊列,並且檢查它是否爲空,並且第二個隊列也是空的,那麼它會很好,然後它會中斷。

我也可以使用Poison對象,但寧願選擇別的。

注意:這不是確切的代碼,只是我在這裏寫下:

+1

我無法弄清楚你在這裏要做什麼。請解釋。 – 2012-01-04 01:05:27

回答

1

您聽起來好像處理第一個隊列的線程知道在其隊列耗盡後不會再有任務進入。這聽起來很可疑,但我會隨時聽取你的意見並提出解決方案。

定義兩個線程均可見的AtomicInteger。初始化爲正數

定義第一線程的操作如下:

  • 環路上Queue#poll()
  • 如果Queue#poll()返回null,則在共享整數上調用AtomicInteger#decrementAndGet()
    • 如果AtomicInteger#decrementAndGet()返回零,則通過Thread#interrupt()中斷第二個線程。 (這處理了沒有物品到達的情況。)
    • 無論哪種情況,退出循環。
  • 否則,處理提取的項目,在共享整數上調用AtomicInteger#incrementAndGet(),將提取的項目添加到第二個線程的隊列中,並繼續循環。

定義線程的操作如下:

  • 環路阻塞BlockingQueue#take()
  • 如果BlockingQueue#take()引發InterruptedException,發現異常,請致電Thread.currentThread().interrupt(),並退出循環。
  • 否則,處理提取的項目。
  • 在共享整數上調用AtomicInteger#decrementAndGet()
    • 如果AtomicInteger#decrementAndGet()返回零,則退出循環。
    • 否則,繼續循環。

確保您瞭解嘗試寫實際的代碼之前的想法。合約是第二個線程繼續等待隊列中的更多項目,直到預期任務的計數達到零。此時,生產線程(第一個)不再將任何新項目推送到第二個線程的隊列中,因此第二個線程知道停止爲其隊列服務是安全的。

沒有任務到達第一個線程的隊列時,就會出現扭曲的情況。由於第二個線程僅在之後遞減並測試的計數,所以它處理物品,如果它從來沒有機會處理任何物品,則它不會考慮停止。我們使用線程中斷來處理這種情況,代價是第一個線程循環終止步驟中的另一個條件分支。幸運的是,該分支只能執行一次。

有許多設計可以在這裏工作。我只描述了一個只引入了一個額外的實體—的共享原子整數—,但即使如此,它很費勁。我認爲使用毒丸會更清潔,但我確實承認Queue#add()BlockingQueue#put()均不接受null作爲有效元素(由於Queue#poll()的返回值合同)。否則將很容易使用null作爲毒丸

+0

謝謝。我將嘗試首先使用毒藥丸 – 2012-01-04 02:59:16

+0

我不知道我可以使用毒藥丸,因爲有多個消耗隊列。 – 2012-01-04 03:35:42

+0

@ DominicBou-Samra - 如果你解釋了你的代碼試圖達到什麼目的,也許有人可以幫助你以更明智的方式做到這一點。 – 2012-01-04 04:14:43

2

我想不出什麼你實際上是試圖在這裏做的,但我可以說,interrupt()在你第一次run()方法是無意義的或錯誤的。

  • 如果您正在運行在自己的Thread對象run()方法,那麼線程大概要退出,所以沒有點中斷它。

  • 如果您正在一個帶有線程池的執行程序中運行run()方法,那麼您很可能不希望終止該線程或關閉執行程序......此時。如果你想關閉執行程序,那麼你應該調用其中一種關閉方法。


例如,這裏有一個版本什麼呢,你似乎怎麼做,而不全部中斷的東西,沒有線程創建/銷燬流失。

public class TaskExecutor { 

    private ExecutorService executor = new ThreadPoolExecutorService(...); 

    public void submitTask1(final SomeTask task) { 
     executor.submit(new Runnable(){ 
      public void run() { 
       doTask(task); 
       submitTask2(task); 
      } 
     }); 
    } 

    public void submitTask2(final SomeTask task) { 
     executor.submit(new Runnable(){ 
      public void run() { 
       doTask2(task); 
      } 
     }); 
    } 

    public void shutdown() { 
     executor.shutdown(); 
    } 
} 

如果您想爲任務單獨排隊,只需創建並使用兩個不同的執行程序即可。