2011-11-02 35 views
7

我有一個寬度爲10的固定線程池ExecutorService和一個100 Callable的列表,每個等待20秒並記錄它們的中斷。Java ExecutorService invokeAll()中斷

我在單獨的線程中調用了該列表上的invokeAll,並且幾乎立即中斷了此線程。 ExecutorService執行按預期中斷,但由Callables記錄的實際中斷數遠遠超過預期的10-20-40。爲什麼這樣,如果ExecutorService可以同時執行不超過10個線程?

全部源:(您可能需要更多的,一旦由於併發運行)

@Test 
public void interrupt3() throws Exception{ 
    int callableNum = 100; 
    int executorThreadNum = 10; 
    final AtomicInteger interruptCounter = new AtomicInteger(0); 
    final ExecutorService executorService = Executors.newFixedThreadPool(executorThreadNum); 
    final List <Callable <Object>> executeds = new ArrayList <Callable <Object>>(); 
    for (int i = 0; i < callableNum; ++i) { 
     executeds.add(new Waiter(interruptCounter)); 
    } 
    Thread watcher = new Thread(new Runnable() { 

     @Override 
     public void run(){ 
      try { 
       executorService.invokeAll(executeds); 
      } catch(InterruptedException ex) { 
       // NOOP 
      } 
     } 
    }); 
    watcher.start(); 
    Thread.sleep(200); 
    watcher.interrupt(); 
    Thread.sleep(200); 
    assertEquals(10, interruptCounter.get()); 
} 

// This class just waits for 20 seconds, recording it's interrupts 
private class Waiter implements Callable <Object> { 
    private AtomicInteger interruptCounter; 

    public Waiter(AtomicInteger interruptCounter){ 
     this.interruptCounter = interruptCounter; 
    } 

    @Override 
    public Object call() throws Exception{ 
     try { 
      Thread.sleep(20000); 
     } catch(InterruptedException ex) { 
      interruptCounter.getAndIncrement(); 
     } 
     return null; 
    } 
} 

使用的WinXP 32位,甲骨文JRE 1.6.0_27和JUnit4

+0

嗯......將它轉換成一個程序,主要方法,我一直(在Windows Java 7中)得到10 ... –

+0

做同樣的,有37(1.6.0_27中,Windows XP)。沒有Java 7測試,有人可以確認嗎? –

+0

我會努力工作。也許這是一個Java 6的bug ... –

回答

4

我與假設不同意你應該只接收10箇中斷。

Assume the CPU has 1 core. 
1. Main thread starts Watcher and sleeps 
2. Watcher starts and adds 100 Waiters then blocks 
3. Waiter 1-10 start and sleep in sequence 
4. Main wakes and interrupts Watcher then sleeps 
5. Watcher cancels Waiter 1-5 then is yielded by the OS (now we have 5 interrupts) 
6. Waiter 11-13 start and sleep 
7. Watcher cancels Waiter 6-20 then is yielded by the OS (now we have 13 interrupts) 
8. Waiter 14-20 are "started" resulting in a no-op 
9. Waiter 21-24 start and sleep 
.... 

從本質上講,我的觀點是,有沒有保證,觀察者線程將被允許取消所有100個「服務員」 RunnableFuture情況下,它具有產生時間片,並允許的ExecutorService的工作線程開始更前服務員任務。

更新:顯示代碼AbstractExecutorService

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
    throws InterruptedException { 
    if (tasks == null) 
     throw new NullPointerException(); 
    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 
    boolean done = false; 
    try { 
     for (Callable<T> t : tasks) { 
      RunnableFuture<T> f = newTaskFor(t); 
      futures.add(f); 
      execute(f); 
     } 
     for (Future<T> f : futures) { 
      if (!f.isDone()) { 
       try { 
        f.get(); //If interrupted, this is where the InterruptedException will be thrown from 
       } catch (CancellationException ignore) { 
       } catch (ExecutionException ignore) { 
       } 
      } 
     } 
     done = true; 
     return futures; 
    } finally { 
     if (!done) 
      for (Future<T> f : futures) 
       f.cancel(true); //Specifying "true" is what allows an interrupt to be sent to the ExecutorService's worker threads 
    } 
} 

的finally塊包含f.cancel(true)是當中斷將傳播到當前正在運行的任務。如您所見,這是一個緊密的循環,但不能保證執行循環的線程能夠在一個時間片內遍歷Future的所有實例。

+0

所以你說,invokeAll()的中斷並不意味着在中斷正在運行的任務之前立即取消所有排隊的任務?對我而言,這看起來像是最小驚訝原則的直接突破。 –

+1

正確。處理任務的工作線程與執行'invokeAll()'的線程不同。在一個線程中調用中斷並不意味着任何其他線程應該被中斷,所以我一點都不驚訝,你能接受有時工作線程的10餘個中斷。正如我在發佈的帶註釋的代碼中所提到的,僅將中斷髮送給處理該任務的工作線程,作爲傳遞給'Future.cancel'方法的布爾參數的優點。 –

0
PowerMock.mockStatic (Executors.class); 
EasyMock.expect (Executors.newFixedThreadPool (9)).andReturn (executorService); 

Future<MyObject> callableMock = (Future<MyObject>) 
EasyMock.createMock (Future.class); 
EasyMock.expect (callableMock.get (EasyMock.anyLong(), EasyMock.isA (TimeUnit.class))).andReturn (ccs).anyTimes(); 

List<Future<MyObject>> futures = new ArrayList<Future<MyObject>>(); 
futures.add (callableMock); 
EasyMock.expect (executorService.invokeAll (EasyMock.isA (List.class))).andReturn (futures).anyTimes(); 

executorService.shutdown(); 
EasyMock.expectLastCall().anyTimes(); 

EasyMock.expect (mock.getMethodCall ()).andReturn (result).anyTimes(); 

PowerMock.replayAll(); 
EasyMock.replay (callableMock, executorService, mock); 

Assert.assertEquals (" ", answer.get (0)); 
PowerMock.verifyAll(); 
1

如果你想要達到相同的行爲

ArrayList<Runnable> runnables = new ArrayList<Runnable>(); 
    executorService.getQueue().drainTo(runnables); 

添加此塊之前中斷的線程池。

它會將所有等待隊列排到新列表中。

所以它只會中斷正在運行的線程。