2016-08-09 87 views
1

下面是我的線程池ThreadPoolExecutor的規模和管理線程

<bean id="executorServiceThreadPool" 
     class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean"> 
     <property name="corePoolSize" value="100" /> 
     <property name="maxPoolSize" value="500" /> 
     <property name="keepAliveSeconds" value="60" /> 
     <property name="WaitForTasksToCompleteOnShutdown" value="true" /> 
    </bean> 

配置和我在一個創建線程的循環並執行它們放在一起。

for(int i=0;i<sublist.size();i++) { 
    Callable<Object> secDataTask = createTask(businessDate, sublist.get(i)); 
    taskList.put(SecYieldConstants.SECURITY_SEC_TASK+i, secDataTask); 
} 
Map<Callable<Object>,Object> taskResult = 
    commonTaskExecutor.executeTasksConcurrently(MAX_TIMEOUT, taskList); 

名單通常是10-20K,並在列表中的每個項目有越來越創造了約10個任務。

當我用默認設置執行它,我要麼得到InterruptedException或得到IndexOutOfBoundException。

但是如果我創造的唯一的,它工作正常5-10子列表,

應我的池大小是在這種情況下怎麼辦? 還是有辦法可以讓JVM處理我的池大小

代碼創建任務:

private Callable<Object> createTask(final Date businessDate,final Object obj) { 
     Callable<Object> securitySecTask = new Callable<Object>() { 
      public Object call() throws Exception { 
       Object result = retrieveInnerResult(businessDate,obj) 
       return result; 
      } 
     }; 
     return securitySecTask; 
    } 


private Callable<Object> retrieveInnerResult(final Date businessDate,final Object obj) { 
     Callable<Object> securitySecTask = new Callable<Object>() { 
      public Object call() throws Exception { 
       Object result = retrievetask2Result(businessDate,obj) 
       return result; 
      } 
     }; 
     return securitySecTask; 
    } 
+0

你能提供堆棧跟蹤嗎?和'ceateTask'的代碼?我不相信這是'Executor'的問題。 – bradimus

+0

IndexOutOfBoundException最可能是由您的代碼中的錯誤引起的,而不是由線程池的配置引起的。閱讀堆棧跟蹤,找到並修復它。 –

+0

在java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1038) \t在java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326) \t在java.util.concurrent中。 FutureTask $ Sync.innerGet(FutureTask.java:257) \t at java.util.concurrent.FutureTask.get(FutureTask.java:119) –

回答

1

IndexOutOfBoundException誤差可能來自於一個特定的指數增加taskList元素(或者在某處代碼!),並且與線程池無關。

關於線程池,100作爲核心大小很大,500作爲最大是非常大的!代碼很可能運行在沒有那麼多內核的cpu上。因此,除非正在執行的代碼嚴重阻塞(如等待Web查詢),否則擁有那麼多線程可能會減慢整體執行速度。嘗試減少線程數量以查看程序如何反應。

關於InterruptedException,可能是因爲您正在嘗試關閉池,並且某些正在執行的任務可能會被中斷並拋出異常。很難說,沒有更多的信息。

我希望這會有所幫助。

+0

感謝您的迴應。這裏是堆棧跟蹤,ifi減少池大小並執行它給我一個InterruptedException,在java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1038) \t在java.util.concurrent.locks.AbstractQueuedSynchronizer。 tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326) \t at java.util.concurrent.FutureTask $ Sync.innerGet(FutureTask。java:257) \t at java.util.concurrent.FutureTask.get(FutureTask.java:119) –

0

如果我只向任務列表添加較少的任務,任務就會毫無問題地執行。 這是我使用的代碼。

/** 
    * @param timeout 
    * @param Map<String,Callable<Object>> taskExecutionList 
    * @return Map<Callable<Object>, Object> 
    * @throws Exception 
    */ 
    public Map<Callable<Object>, Object> executeTasks(Long timeout, Map<String, Callable<Object>> taskExecutionList) throws Exception{ 
      //Execute all valid tasks and await termination (all tasks to be complete or timeout as passed by caller) 
      List<Future<Object>> taskResult = executorServiceThreadPool.invokeAll(taskExecutionList.values(), timeout, TimeUnit.SECONDS); 
      return retrieveTaskResult(taskExecutionList, taskResult); 
    } 


    /** 
    * @param taskset 
    * @param executedTasks 
    * @return Map<Callable<Object>, Object> 
    * @throws Exception 
    */ 
    private Map<Callable<Object>, Object> retrieveTaskResult(Map<String,Callable<Object>> taskset, List<Future<Object>> executedTasks) throws Exception { 
     Map<Callable<Object>, Object> result = new LinkedHashMap<Callable<Object>, Object>(); 
     int taskCount = 0; 

     //populate result map with callable and respective results from Future 
     for(Map.Entry<String, Callable<Object>> task : taskset.entrySet()){ 
      Callable<Object> callableTask = task.getValue(); 
      Future<Object> taskResult = executedTasks.get(taskCount); 
       if(!taskResult.isCancelled() && taskResult.isDone()){ 
        result.put(callableTask, taskResult.get()); 
       } 
      taskCount++; 
     } 
     return result; 
    }