2011-08-11 138 views
7

我對生產者 - 消費者模式的理解是,它可以使用生產者和消費者之間共享的隊列來實現。生產者將工作提交給共享隊列,消費者檢索並處理它。它也可以由生產者直接提交給消費者實施(生產者線程直接提交給消費者的執行者服務)。生產者消費者 - 使用Executors.newFixedThreadPool

現在,我一直在尋找提供線程池的一些常見實現的Executors類。根據規範,newFixedThreadPool方法「重用固定數量的線程,在共享的無界隊列中運行」。他們在這裏談論哪個隊列?

如果生產者直接向消費​​者提交任務,它是包含Runnables列表的ExecutorService的內部隊列嗎?

或者它是中間隊列,以防生產者提交到共享隊列?

可能是我錯過了整點,但有人請澄清?

回答

4

你是對的,ExecutorService不僅是一個線程池,它是一個完整的生產者 - 消費者實現。這個內部隊列實際上是一個線程安全的隊列Runnable(確切地說是FutureTask),它持有你的任務submit()

池中的所有線程在該隊列上被阻塞,等待任務被執行。當你有一個任務時,恰好有一個線程會把它拿起來並運行它。當然submit()不等待池中的線程完成處理。

在另一方面,如果你提交的任務(或長期運行的)的數量巨大,你可能最終向上與被佔用池中的所有線程和一些任務在隊列中等待。一旦任何線程完成其任務,它將立即從隊列中選擇第一個線程。

+0

只是爲了澄清:'ExecutorService'只是一個接口。你可以在一個線程中執行'ExecutorService'類,它只要運行一個可運行的類就可以運行每個可運行的類(我相信'java.util.concurrent'包中有一個實現就是這麼做的) 。但在實踐中*,大多數Exec​​utorService實現是完整的生產者 - 消費者實現。 –

+0

你是絕對正確的,通過'ExecutorService''我的意思是「*執行'ExecutorService' *」的'Executors.newFixedThreadPool()'返回的東西。感謝您的澄清。 –

+1

謝謝你們。所以,如果我創建使用的newFixedThreadPool(8),然後就可以執行大約1000可運行任務的執行服務,請確認一下我的情況的理解: 1.最多8個線程將在處理開始創建 2 ,而8個線程正忙,992個任務將被保存在內部隊列中 3.另外,因爲它是一個無界隊列,所以我可以提交給執行程序服務的任務數量沒有上限。 如果我使用有界隊列創建ExecutorService,上述場景會產生什麼影響?它會表現更好嗎? 謝謝,O. – Oxford

1
public class Producer extends Thread { 
    static List<String> list = new ArrayList<String>(); 

    public static void main(String[] args) { 
     ScheduledExecutorService executor = Executors 
       .newScheduledThreadPool(12); 
     int initialDelay = 5; 
     int pollingFrequency = 5; 
     Producer producer = new Producer(); 
     @SuppressWarnings({ "rawtypes", "unused" }) 
     ScheduledFuture schedFutureProducer = executor.scheduleWithFixedDelay( 
       producer, initialDelay, pollingFrequency, TimeUnit.SECONDS); 
     for (int i = 0; i < 3; i++) { 
      Consumer consumer = new Consumer(); 
      @SuppressWarnings({ "rawtypes", "unused" }) 
      ScheduledFuture schedFutureConsumer = executor 
        .scheduleWithFixedDelay(consumer, initialDelay, 
          pollingFrequency, TimeUnit.SECONDS); 
     } 

    } 

    @Override 
    public void run() { 
     list.add("object added to list is " + System.currentTimeMillis()); 
           ///adding in list become slow also because of synchronized behavior 
    } 
} 

class Consumer extends Thread { 

    @Override 
    public void run() { 
     getObjectFromList(); 
    } 

    private void getObjectFromList() { 
     synchronized (Producer.list) { 
      if (Producer.list.size() > 0) { 
       System.out.println("Object name removed by " 
         + Thread.currentThread().getName() + "is " 
         + Producer.list.get(0)); 
       Producer.list.remove(Producer.list.get(0)); 
      } 
     } 
    } 
}