6

我認爲使用ThreadPoolExecutor我們可以提交Runnable s執行或者在構造函數中傳遞的BlockingQueue或使用execute方法。
另外我的理解是,如果任務可用,它將被執行。
什麼我不明白的是:ThreadPoolExecutor和隊列

public class MyThreadPoolExecutor { 

    private static ThreadPoolExecutor executor; 

    public MyThreadPoolExecutor(int min, int max, int idleTime, BlockingQueue<Runnable> queue){ 
     executor = new ThreadPoolExecutor(min, max, 10, TimeUnit.MINUTES, queue); 
     //executor.prestartAllCoreThreads(); 
    } 

    public static void main(String[] main){ 
     BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(); 
     final String[] names = {"A","B","C","D","E","F"}; 
     for(int i = 0; i < names.length; i++){ 
      final int j = i; 
      q.add(new Runnable() { 

       @Override 
       public void run() { 
        System.out.println("Hi "+ names[j]); 

       } 
      });   
     } 
     new MyThreadPoolExecutor(10, 20, 1, q); 
     try { 
      TimeUnit.SECONDS.sleep(5); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
     /*executor.execute(new Runnable() { 

      @Override 
      public void run() { 

       System.out.println("++++++++++++++"); 

      } 
     }); */ 
     for(int i = 0; i < 100; i++){ 
      final int j = i; 
      q.add(new Runnable() { 

       @Override 
       public void run() { 
        System.out.println("Hi "+ j); 

       } 
      }); 
     } 


    } 


} 

此代碼沒有絕對任何事情,除非我要麼取消註釋構造函數中的executor.prestartAllCoreThreads();或我稱之爲可運行的execute,打印System.out.println("++++++++++++++");(它也評論出)。

爲什麼?
報價(我的重點):

默認情況下,即使是最初創建核心線程和纔開始 當新任務到達,但可以使用動態 方法prestartCoreThread()或prestartAllCoreThreads覆蓋() 。如果您使用非空 隊列構建池,您可能需要 預啓動線程。

好的。所以我的隊列不是空的。但是我創建了executor,我做了sleep,然後我將新的Runnable s添加到隊列中(在循環中爲100)。
這個循環不計爲new tasks arrive
爲什麼它不起作用,我必須要麼prestart要麼明確地致電execute

+1

工作線程是在任務通過'execute'到達時產生的,而這些線程是與底層工作隊列交互的。如果您從非空工作隊列開始,則需要預先啓動工作人員。請參閱[在OpenJDK 7中的實現](http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/java/util/concurrent/ThreadPoolExecutor.java)。 – oldrinb 2012-08-05 21:17:57

+0

@veer:是的,我知道。但是我從一個非空隊列開始,然後開始添加到隊列中。爲什麼我需要預先啓動工人?爲什麼我需要每次都查看實施情況?規範是錯誤的或者我不理解它。那它是哪一個? – Cratylus 2012-08-05 21:18:59

+0

同樣,工作人員是直接與隊列交互的人員。它們僅在通過'execute'(或其上面的圖層,例如'invokeAll','submit'等)傳遞時才按需生成。查看'execute'的代碼[here](http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7-b147/java/util/concurrent/ThreadPoolExecutor.java#1300 )。 – oldrinb 2012-08-05 21:21:35

回答

9

工作線程是在任務到達時通過執行產生的,而這些線程是與底層工作隊列交互的線程。如果您從非空工作隊列開始,則需要預先啓動工作人員。請參閱implementation in OpenJDK 7

我重複一次,工作人員是與工作隊列互動的人員。它們僅在通過execute傳遞時才按需求生成。 (或者它上面的圖層,例如invokeAll,submit等)如果它們沒有啓動,那麼添加到隊列中的工作量是無關緊要的,因爲沒有任何將其檢查爲沒有工人啓動

ThreadPoolExecutor不產生工作線程,直到必要或如果您通過方法prestartAllCoreThreadsprestartCoreThread搶先創建它們的創建。如果沒有工人開始工作,那麼排隊工作就沒有辦法完成。

添加初始execute工程的原因是,它強制創建一個核心工作線程,然後可以開始處理隊列中的工作。您也可以致電prestartCoreThread並收到類似的行爲。如果要啓動全部工作人員,則必須致電prestartAllCoreThreads或通過execute提交該任務的數量。

請參閱下面的execute的代碼。

/** 
* Executes the given task sometime in the future. The task 
* may execute in a new thread or in an existing pooled thread. 
* 
* If the task cannot be submitted for execution, either because this 
* executor has been shutdown or because its capacity has been reached, 
* the task is handled by the current {@code RejectedExecutionHandler}. 
* 
* @param command the task to execute 
* @throws RejectedExecutionException at discretion of 
*   {@code RejectedExecutionHandler}, if the task 
*   cannot be accepted for execution 
* @throws NullPointerException if {@code command} is null 
*/ 
public void execute(Runnable command) { 
    if (command == null) 
     throw new NullPointerException(); 
    /* 
    * Proceed in 3 steps: 
    * 
    * 1. If fewer than corePoolSize threads are running, try to 
    * start a new thread with the given command as its first 
    * task. The call to addWorker atomically checks runState and 
    * workerCount, and so prevents false alarms that would add 
    * threads when it shouldn't, by returning false. 
    * 
    * 2. If a task can be successfully queued, then we still need 
    * to double-check whether we should have added a thread 
    * (because existing ones died since last checking) or that 
    * the pool shut down since entry into this method. So we 
    * recheck state and if necessary roll back the enqueuing if 
    * stopped, or start a new thread if there are none. 
    * 
    * 3. If we cannot queue task, then we try to add a new 
    * thread. If it fails, we know we are shut down or saturated 
    * and so reject the task. 
    */ 
    int c = ctl.get(); 
    if (workerCountOf(c) < corePoolSize) { 
     if (addWorker(command, true)) 
      return; 
     c = ctl.get(); 
    } 
    if (isRunning(c) && workQueue.offer(command)) { 
     int recheck = ctl.get(); 
     if (! isRunning(recheck) && remove(command)) 
      reject(command); 
     else if (workerCountOf(recheck) == 0) 
      addWorker(null, false); 
    } 
    else if (!addWorker(command, false)) 
     reject(command); 
} 
+0

您沒有回答與規範有關的OP。並且您通過將JVM實現的'one'的實現細節(對於Oracle或IBM等可能不同)進行回答 – Cratylus 2012-08-05 21:26:32

+0

'在執行任務到達時產生工作線程'。另外,如果工作線程啓動,我可以直接將任務添加到隊列中,而無需調用'execute' – Cratylus 2012-08-05 21:29:14

+0

我非常明白地將一個實現的實現細節(實際上是Oracle JDK中的_same_)。 OP就是爲什麼代碼能夠正常工作,並且我解釋了它。 'ThreadPoolExecutor'不會派生工作線程直到必要或者如果你通過方法[prestartAllCoreThreads](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor .html#prestartAllCoreThreads())和[prestartCoreThread](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html#prestartCoreThread())。 – oldrinb 2012-08-05 21:31:43

5

BlockingQueue不是神奇的線程調度程序。如果將Runnable對象提交給隊列,並且沒有正在運行的線程來使用這些任務,那麼它們當然不會被執行。另一方面,如果需要,執行方法將根據線程池配置自動分配線程。如果您預先啓動所有核心線程,則會有線程從隊列中消耗任務。

相關問題