2017-06-29 37 views
0

有沒有辦法通過一個龐大的數據庫併爲條目平臺應用一些作業? 我試着用ExecutorService的,但我們必須爲了知道池大小關閉()...Java - ExecutorService具有最大大小

所以我的最好的解決辦法是:

import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 

public class TestCode 
{ 
private static List<String> getIds(int dbOffset, int nbOfArticlesPerRequest) 
{ 
    return Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29"); 
} 

public static void main(String args[]) throws Exception 
{ 
    int dbOffset = 0; 
    int nbOfArticlesPerRequest = 100; 
    int MYTHREADS = 10; 
    int loopIndex = 0; 
    boolean bContinue=true; 
    Runnable worker; 



    while(bContinue) // in this loop we'll constantly fill the pool list 
    { 
     loopIndex++; 
     ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // NOT IDEAL, BUT EXECUTORSERVICE CANNOT BE REUSED ONCE SHUTDOWN... 

     List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest); // getIds(offset, rows_number) 
     for(String id: ids) { 
      worker = new MyRunnable(id); 
      executor.execute(worker); 
     } 

     executor.shutdown(); 
     while (!executor.isTerminated()) { 
      System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+ 
        " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size() 
      ); 
      TimeUnit.MILLISECONDS.sleep(500); 
     } 

     if(loopIndex>=3) { 
      System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n"); 
      bContinue = false; 
     } 
     dbOffset+=nbOfArticlesPerRequest; 
    } 
} 



public static class MyRunnable implements Runnable { 

    private final String id; 

    MyRunnable(String id) { 
     this.id = id; 
    } 

     @Override 
     public void run() 
     { 
      System.out.println("Thread '"+id+"' started"); 
      try { 
       TimeUnit.MILLISECONDS.sleep(2000); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      System.out.println("Thread '"+id+"' stopped"); 
     } 
    } 
} 

這是工作正常,但美中不足的是,在循環的每一端,我都需要等待最後一個線程完成。

例如爲:當只有3個線程正在運行...

我做了如下爲了解決這個問題,但就是「安全」 /是否正確?

順便說一句:有什麼方法可以知道隊列中有多少個任務/線程?

int dbOffset = 0; 
    int nbOfArticlesPerRequest = 5; //100; 
    int MYTHREADS = 2; 
    int loopIndex = 0; 

    ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // **HERE IT WOULD BE A GLOBAL VARIABLE** 
     while(bContinue) // in this loop we'll constantly fill the pool list 
     { 
      loopIndex++; 

      List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest); // getIds(offset, rows_number) 
      for(String id: ids) { 
        worker = new MyRunnable(id); 
        executor.execute(worker); 
      } 

      while (!executor.isTerminated() && ((ThreadPoolExecutor) executor).getActiveCount() >= MYTHREADS) { 
       System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+ 
         " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size() 
       ); 
       TimeUnit.MILLISECONDS.sleep(500); 
      } 

      if(loopIndex>=3) { 
       System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n"); 
       bContinue = false; 
      } 
      dbOffset+=nbOfArticlesPerRequest; 
     } 

    executor.shutdown(); 
    // Wait until all threads are finish 
    while (!executor.isTerminated()) { 
     System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+ 
       " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size() 
     ); 
     TimeUnit.MILLISECONDS.sleep(500); 
    } 

編輯:

我嘗試推出1級或10數以百萬計的任務,所以(我認爲),我不能把他們都在排隊......這就是爲什麼我使用一個全球性的執行爲了能夠在隊列中總是有一些線程(因爲我不能關閉執行程序,否則它不再可用)。

最新代碼版本:

int dbOffset = 0; 
    int nbOfArticlesPerRequest = 5; //100; 
    int MYTHREADS = 2; 
    int loopIndex = 0; 

    ThreadPoolExecutor executorPool = new ThreadPoolExecutor(MYCORES, MYCORES, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // **HERE IT WOULD BE A GLOBAL VARIABLE** 
     while(bContinue) // in this loop we'll constantly fill the pool list 
     { 
      loopIndex++; 

      List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest); // getIds(offset, rows_number) 
      for(String id: ids) { 
        worker = new MyRunnable(id); 
        executorPool.execute(worker); 
      } 

      while (executorPool.getActiveCount() >= MYTHREADS || executorPool.getQueue().size()> Math.max(1, MYTHREADS -2)) 
      { 
       System.out.println("Pool size is now " + executorPool.getActiveCount()+ 
             " - queue size: "+ executorPool.getQueue().size() 
       ); 

       if(executorPool.getQueue().size() <= Math.max(1, MYCORES-2)) { 
        System.out.println("Less than "+Math.max(1, MYCORES-2)+" threads in queue ---> fill the queue"); 
        break; 
       } 

       TimeUnit.MILLISECONDS.sleep(2000); 
      } 

      if(loopIndex>=3) { 
       System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n"); 
       bContinue = false; 
      } 
      dbOffset+=nbOfArticlesPerRequest; 
     } 

    executorPool.shutdown(); 
    // Wait until all threads are finish 
    while (!executorPool.isTerminated()) { 
     System.out.println("Pool size is now " + executorPool.getActiveCount()+ 
       " - queue size: "+ executorPool.getQueue().size() 
     ); 
     TimeUnit.MILLISECONDS.sleep(500); 
    } 

在此先感謝

+0

可以使用的invokeAll()來等待線程的完成。參考:https://stackoverflow.com/questions/3269445/executorservice-how-to-wait-for-all-tasks-to-finish/36699136#36699136 –

回答

0

更新

現在很明顯,我認爲你的主要關注它,你不能在提交千萬任務一旦。

不要害怕,你可以把它們全部提交給執行者。並行運行的實際任務數量受底層線程池大小的限制。也就是說,如果你的池大小爲2,那麼當時只有兩個任務正在執行,其餘的則在隊列中等待空閒線程。

默認情況下,Executors.newFixedThreadPool()創建一個隊列大小爲Integer.MAX_VALUE的執行程序,因此您的數百萬個任務將適合此處。


您可以使用ExecutorService.submit()方法返回Future。然後,您可以檢查未來任務的狀態(即使用isDone(),isCancelled()方法)。

執行程序通常是您不希望顯式關閉並且在整個應用程序生命週期中存在的東西。通過這種方法,您無需關閉以瞭解有多少任務正在處理中。

List<Future<?>> tasks = new ArrayList<>(); 
for (String id : ids) { 
    Future<?> task = executorService.submit(() -> { 
     // do work 
    }); 
    tasks.add(task); 
} 

long pending = tasks.stream().filter(future -> !future.isDone()).count(); 
System.out.println(pending + " task are still pending"); 

而且,請注意,任務和線程是不能互換的條款。在你的情況下,執行者具有固定的線程數。您可以提交比此更多的任務,但其餘部分將位於執行程序隊列中,直到有空閒線程運行任務爲止。

+0

可能是一個好主意......然後我只需要添加一個「等待循環」,以便在少於X任務運行時分配更多任務... – Bast

+0

與@Pavan相同的問題,與我的解決方案相比,您的解決方案有什麼優勢(請參閱EDIT之後的最新代碼)? – Bast

+0

好吧,現在我明白了你的觀點並更新了答案。我認爲你正在努力模擬Executors已經提供的東西--_queue_。 –

0

ExecuterService允許您調用可並行運行的任務列表,並在結果可用時返回結果。

在你的代碼使用

worker = new MyRunnable(id); 
executor.execute(worker); 

相反Runnable,它能夠更好地使用Callable在這種使用情況下,那麼你可以提交可調用的列表執行單一的API而不是爲循環。

List<Callable> workers = new ArrayList<>(); 
workers.add(new MyCallable(id)) // this is just for example 
workers.add(new MyCallable(id)) 
workers.add(new MyCallable(id)) 

List<Future<Boolean>> futures = executor.invokeAll(workers); // this will execute all worker tasks parallely and return you future object list using which you can see whether worker thread is completed or not and also the what is the return value. 

注意未來對象的get方法阻塞調用

+0

invokeAll也是阻塞的,所以最初的問題(需要等待每個循環的最後一個線程)沒有解決。 :) – Bast

+0

@Bast - 根據我的理解,invokeAll不阻止呼叫。 https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#invokeAll(java.util.Collection) – Pavan

+0

你是對的,只有future.get()正在阻止...我可以使用您的解決方案作爲由@DavidSiro建議的解決方案... – Bast

0

你不需要知道線程池的大小,檢查任務的完成在ExecutorService。提交任務後,您可以刪除您的代碼。

選項1:

  1. newWorkStealingPool從執行人替換的ThreadPoolExecutor。

    使用所有可用的處理器創建工作線程池作爲其目標並行級別。

    它可以更好地利用ExecutorService中的線程。

    ExecutorService executor = Executors.newWorkStealingPool(); 
    
  2. 使用invokeAll

選項2:(有用的,如果你知道任務提前數)

使用CountDownLatch並初始化計數器任務數是提交。

更多參考:

wait until all threads finish their work in java

How to properly shutdown java ExecutorService

+0

是的,但因爲它是一個while循環我想動態添加新線程,以便始終有一些在「隊列」 ...實際上使用getActiveCount()更正確(代碼更新) - 我現在甚至切換到我的本地代碼中的ThreadPoolExecutor – Bast

+0

請注意,在我的第二個代碼部分(即:「解決方案」)中,ExecutorService是全局的,所以它不能被關閉,否則它不再可用 – Bast

+0

在while循環之外,可以按照上面引用的順序使用shutdown,sbutdownNow,awaitTermination API來保持關閉代碼。 –