7

我一起實現我處理管道的最佳方式摔跤。生產者/消費者工作隊列

我的飼料生產工作提高到一個BlockingQueue的。在消費者方面,我輪詢隊列,將我在Runnable任務中獲得的內容包裝起來,並將其提交給ExecutorService。

while (!isStopping()) 
{ 
    String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS); 
    if (work == null) 
    { 
     break; 
    } 
    executorService.execute(new Worker(work)); // needs to block if no threads! 
} 

這並不理想;當然,ExecutorService有自己的隊列,所以實際發生的事情是我總是完全耗盡我的工作隊列並填充任務隊列,隨着任務完成,任務隊列逐漸清空。

我意識到,我可以在生產端排隊的任務,但我真的不想這樣做 - 我喜歡我的工作隊列爲啞弦的間接/隔離;這真的不是生產者的任何事情會發生在他們身上。強制生產者排隊Runnable或Callable打破抽象,恕我直言。

但我想共享的工作隊列代表當前處理狀態。如果消費者沒有跟上,我希望能夠阻止生產者。

我喜歡用執行人,但我覺得我打他們的設計。我可以部分喝Kool-ade,還是必須喝它?我是否在抵制排隊任務方面出錯? (我懷疑我可以將ThreadPoolExecutor設置爲使用1任務隊列並覆蓋它的執行方法來阻止而不是拒絕隊列滿,但這種感覺很糟糕。)

建議?

回答

14

我想要共享工作隊列 代表當前正在處理的 狀態。

嘗試使用共享BlockingQueue並使工作線程池從隊列中取出工作項。

我希望能夠阻止 生產者如果消費者不 跟上。

兩個ArrayBlockingQueueLinkedBlockingQueue支持有界隊列,使得它們在放置塊滿時。使用阻止put()方法可確保在隊列已滿時阻止生產者。

這裏是在艱難中起步。您可以調整工人和隊列大小的數目:

public class WorkerTest<T> { 

    private final BlockingQueue<T> workQueue; 
    private final ExecutorService service; 

    public WorkerTest(int numWorkers, int workQueueSize) { 
     workQueue = new LinkedBlockingQueue<T>(workQueueSize); 
     service = Executors.newFixedThreadPool(numWorkers); 

     for (int i=0; i < numWorkers; i++) { 
      service.submit(new Worker<T>(workQueue)); 
     } 
    } 

    public void produce(T item) { 
     try { 
      workQueue.put(item); 
     } catch (InterruptedException ex) { 
      Thread.currentThread().interrupt(); 
     } 
    } 


    private static class Worker<T> implements Runnable { 
     private final BlockingQueue<T> workQueue; 

     public Worker(BlockingQueue<T> workQueue) { 
      this.workQueue = workQueue; 
     } 

     @Override 
     public void run() { 
      while (!Thread.currentThread().isInterrupted()) { 
       try { 
        T item = workQueue.take(); 
        // Process item 
       } catch (InterruptedException ex) { 
        Thread.currentThread().interrupt(); 
        break; 
       } 
      } 
     } 
    } 
} 
+0

謝謝;我之前的實現很像這樣,儘管它只是使用了ThreadFactory--一旦你將它減少到一組固定的線程,這些線程都試圖耗盡工作隊列,那麼使用ExecutorService就沒有多大意義。我切換到ExecutorService以便利用更可調的線程池,其語義是「查找可用的現有工作線程(如果存在的話),如果有必要,創建一個線程,如果它們空閒,就殺掉它們。」 –

+0

Executors.newCachedThreadPool()將做類似的事情。您也可以真正調整ThreadPoolExecutor本身的池策略。你在追求什麼? – Kevin

+0

這就是這個想法......如果我願意使用它的任務工作隊列,它可以完全按照我喜歡的方式進行調整。我真正想要的是從執行程序中挖掘出線程池的智慧,並實現我自己的線程池客戶端,但它並不是真正爲此設置的。 –

0

你可以讓你的消費執行Runnable::run,而不是直接啓動一個新的線程起來的。把它和一個最大尺寸的阻塞隊列結合起來,我想你會得到你想要的。您的使用者成爲基於隊列上的工作項目內聯執行任務的工作人員。他們只會在消費者停止消費時以最快的速度出場。

+0

這隻會給一個工人,我想調整它以最大化給定系統配置的處理。處理作業將持續幾秒到幾十秒,並且是I/O綁定和CPU綁定工作的混合。 –

+0

我假設多個消費者與一個或多個生產者相關聯。如果你只有一個消費者,那麼爲什麼不讓生產者直接把工作轉交給執行者呢? –

1

「找到一個可用的現有工作線程(如果存在的話),創建一個,如果有必要的話,殺死它們,如果它們閒置。

管理所有這些工人狀態是沒有必要的,因爲它是危險的。我會創建一個監視器線程,不斷在後臺運行,誰的唯一任務是填補隊列併產生消費者......爲什麼不讓工作線程守護進程,以便他們完成後儘快死亡?如果將它們全部附加到一個ThreadGroup中,您可以動態地重新調整池大小...例如:

**for(int i=0; i<queue.size()&&ThreadGroup.activeCount()<UPPER_LIMIT;i++ { 
     spawnDaemonWorkers(queue.poll()); 
    }**