2009-07-08 51 views
1

我有一些CompletionService的問題。 我的任務:解析約300頁的HTML頁面,我需要等待所有的結果只有5秒,然後 - 返回結果到主代碼。 我決定爲此使用CompletionService + Callable。 問題是如何停止由CompletionService引起的所有線程,並返回成功解析的頁面結果?在這段代碼中刪除了printlines,但我可以說5秒就足夠了(有很好的結果,但程序會等待所有線程完成)。我的代碼執行了大約2分鐘。completionservice:如何殺死所有線程並通過5秒返回結果?

我的調用代碼:

Collection<Callable<HCard>> solvers = new ArrayList<Callable<HCard>>(); 
for (final String currentUrl : allUrls) { 
    solvers.add(new Callable<HCard>() { 
     public HCard call() throws ParserException { 
      HCard hCard = HCardParser.parseOne(currentUrl);      
      if (hCard != null) { 
       return hCard; 
      } else { 
       return null; 
      } 
     } 
    }); 
} 
ExecutorService execService = Executors.newCachedThreadPool(); 
Helper helper = new Helper(); 
List<HCard> result = helper.solve(execService, solvers); 
//then i do smth with result list 

我叫代碼:

public class Helper { 
List<HCard> solve(Executor e, Collection<Callable<HCard>> solvers) throws InterruptedException { 
    CompletionService<HCard> cs = new ExecutorCompletionService<HCard>(e); 
    int n = solvers.size(); 

    Future<HCard> future = null; 
    HCard hCard = null; 
    ArrayList<HCard> result = new ArrayList<HCard>(); 

    for (Callable<HCard> s : solvers) { 
     cs.submit(s); 
    } 
    for (int i = 0; i < n; ++i) { 
     try { 
      future = cs.take(); 
      hCard = future.get(); 
      if (hCard != null) { 
       result.add(hCard); 
      } 
     } catch (ExecutionException e1) { 
      future.cancel(true); 
     } 
    } 
    return result; 
} 

我嘗試使用:

  • awaitTermination(5000,TimeUnit.MILLISECONDS)
  • future.cancel(true)
  • execService.shutdownNow()
  • future.get(5000,TimeUnit.MILLISECONDS);
  • TimeOutException:我無法獲取TimeOutException。

請幫我解釋一下我的代碼。
在此先感謝!

+0

是否有您使用緩存的線程池的一個原因?在最壞的情況下它將啓動300個線程。考慮使用FixedThreadPool? – akarnokd 2009-07-08 15:37:45

+1

考慮。我通過分析器看到 - 我的代碼沒有區別。 – dementiev 2009-07-13 12:16:22

回答

6

您需要確保您提交的任務能夠正確響應中斷,即它們檢查Thread.isInterrupted()或被視爲「可中斷」。

我不確定您需要爲此完成的服務。

ExecutorService service = ... 

// Submit all your tasks 
for (Task t : tasks) { 
    service.submit(t); 
} 

service.shutdown(); 

// Wait for termination 
boolean success = service.awaitTermination(5, TimeUnit.SECONDS); 
if (!success) { 
    // awaitTermination timed out, interrupt everyone 
    service.shutdownNow(); 
} 

在這一點上,沒有太多,如果你的工作對象不響應中斷

0

我從來沒有使用CompletionService你可以做,但我敢肯定有一個民意調查(TIMEUNIT,單元)呼叫做有限的等待。然後檢查爲空。測量等待的時間,並在5秒後停止等待。大約:

public class Helper { 
List<HCard> solve(Executor e, Collection<Callable<HCard>> solvers) 
throws InterruptedException { 
CompletionService<HCard> cs = new ExecutorCompletionService<HCard>(e); 
int n = solvers.size(); 

Future<HCard> future = null; 
HCard hCard = null; 
ArrayList<HCard> result = new ArrayList<HCard>(); 

for (Callable<HCard> s : solvers) { 
    cs.submit(s); 
} 
long timeleft = 5000; 
for (int i = 0; i < n; ++i) { 
    if (timeleft <= 0) { 
     break; 
    } 
    try { 
     long t = System.currentTimeMillis(); 
     future = cs.poll(timeleft, TimeUnit.MILLISECONDS); 
     timeleft -= System.currentTimeMillis() - t; 
     if (future != null) { 
      hCard = future.get(); 
      if (hCard != null) { 
       result.add(hCard); 
      } 
     } else { 
      break; 
     } 
    } catch (ExecutionException e1) { 
     future.cancel(true); 
    } 
} 
return result; 
} 

雖然沒有測試。

1

問題是你總是得到每一個結果,所以代碼將始終運行到完成。我會按照下面的代碼使用CountDownLatch來完成它。

此外,不要使用Executors.newCachedThreadPool - 這可能會產生大量的線程(如果任務需要任何時間,最多可以創建300個線程,因爲執行程序不會讓空閒線程的數量下降到零)。

所有類都內聯以使其更容易 - 將整個代碼塊粘貼到名爲LotsOfTasks的類中並運行它。

import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.Collection; 
import java.util.List; 
import java.util.Random; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ConcurrentLinkedQueue; 
import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

public class LotsOfTasks { 
    private static final int SIZE = 300; 

    public static void main(String[] args) throws InterruptedException { 

     String[] allUrls = generateUrls(SIZE); 

     Collection<Callable<HCard>> solvers = new ArrayList<Callable<HCard>>(); 
     for (final String currentUrl : allUrls) { 
      solvers.add(new Callable<HCard>() { 
       public HCard call() { 
        HCard hCard = HCardParser.parseOne(currentUrl); 
        if (hCard != null) { 
         return hCard; 
        } else { 
         return null; 
        } 
       } 
      }); 
     } 
     ExecutorService execService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // One thread per cpu, ideal for compute-bound 
     Helper helper = new Helper(); 

     System.out.println("Starting.."); 
     long start = System.nanoTime(); 
     List<HCard> result = helper.solve(execService, solvers, 5); 
     long stop = System.nanoTime(); 
     for (HCard hCard : result) { 
      System.out.println("hCard = " + hCard); 
     } 

     System.out.println("Took: " + TimeUnit.SECONDS.convert((stop - start), TimeUnit.NANOSECONDS) + " seconds"); 
    } 

    private static String[] generateUrls(final int size) { 
     String[] urls = new String[size]; 
     for (int i = 0; i < size; i++) { 
      urls[i] = "" + i; 
     } 
     return urls; 
    } 

    private static class HCardParser { 
     private static final Random random = new Random(); 

     public static HCard parseOne(String currentUrl) { 
      try { 
       Thread.sleep(random.nextInt(1000)); // Wait for a random time up to 1 seconds per task (simulate some activity) 
      } catch (InterruptedException e) { 
       // ignore 
      } 
      return new HCard(currentUrl); 
     } 
    } 

    private static class HCard { 
     private final String currentUrl; 

     public HCard(String currentUrl) { 
      this.currentUrl = currentUrl; 
     } 

     @Override 
     public String toString() { 
      return "HCard[" + currentUrl + "]"; 
     } 
    } 

    private static class Helper { 
     List<HCard> solve(ExecutorService e, Collection<Callable<HCard>> solvers, int timeoutSeconds) throws InterruptedException { 

      final CountDownLatch latch = new CountDownLatch(solvers.size()); 

      final ConcurrentLinkedQueue<HCard> executionResults = new ConcurrentLinkedQueue<HCard>(); 

      for (final Callable<HCard> s : solvers) { 
       e.submit(new Callable<HCard>() { 
        public HCard call() throws Exception { 
         try { 
          executionResults.add(s.call()); 
         } finally { 
          latch.countDown(); 
         } 
         return null; 
        } 
       }); 
      } 

      latch.await(timeoutSeconds, TimeUnit.SECONDS); 

      final List<Runnable> unfinishedTasks = e.shutdownNow(); 
      System.out.println("There were " + unfinishedTasks.size() + " urls not processed"); 

      return Arrays.asList(executionResults.toArray(new HCard[executionResults.size()])); 
     } 
    } 
} 

我的系統上的典型輸出看起來是這樣的:

Starting.. 
There were 279 urls not processed 
hCard = HCard[0] 
hCard = HCard[1] 
hCard = HCard[2] 
hCard = HCard[3] 
hCard = HCard[5] 
hCard = HCard[4] 
hCard = HCard[6] 
hCard = HCard[8] 
hCard = HCard[7] 
hCard = HCard[10] 
hCard = HCard[11] 
hCard = HCard[9] 
hCard = HCard[12] 
hCard = HCard[14] 
hCard = HCard[15] 
hCard = HCard[13] 
hCard = HCard[16] 
hCard = HCard[18] 
hCard = HCard[17] 
hCard = HCard[20] 
hCard = HCard[19] 
Took: 5 seconds