2011-03-10 98 views
5

任務,我提交使用裹着一個2線FixedThreadPool的ExecutorService一個CompletionService一些未來的某個任務,我設置,則設置等於​​提交的任務數的循環和使用completionservice。採取()等待他們全部完成或失敗。麻煩很偶然,它永遠不會完成(但我不知道爲什麼),所以我將take()方法更改爲poll(300,Timeout.SECONDS),這個想法是,如果一個任務需要超過5分鐘才能完成民意調查將失敗,然後最終會退出循環,我可以通過所有的未來,並調用future.cancel(true)強制取消違規任務。如何取消所花費的時間太長使用CompletionService

但是,當我運行代碼並掛起時,我發現輪詢每5分鐘連續失敗一次,並且沒有更多任務運行,因此我認爲這兩個工作人員以某種方式死鎖,並且永遠不會結束,並且決不允許執行其他任務開始。因爲超時時間爲5分鐘,並且仍有1000個任務需要運行,所以打破循環的時間太長,因此取消了這項工作。

所以我想要做的就是interupt /力消除當前的任務,如果處理不當在5分鐘內完成,但我看不到任何方式做到這一點。

此代碼示例說明了什麼林談論

import com.jthink.jaikoz.exception.JaikozException; 
import java.util.ArrayList; 
import java.util.Collection; 
import java.util.List; 
import java.util.concurrent.*; 

public class CompletionServiceTest 
{ 
    public static void main(final String[] args) 
    { 
     CompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(2)); 
     Collection<Worker> tasks = new ArrayList<Worker>(10); 
     tasks.add(new Worker(1)); 
     tasks.add(new Worker(2)); 
     tasks.add(new Worker(3)); 
     tasks.add(new Worker(4)); 
     tasks.add(new Worker(5)); 
     tasks.add(new Worker(6)); 

     List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size()); 
     try 
     { 
      for (Callable task : tasks) 
      { 
       futures.add(cs.submit(task)); 
      } 
      for (int t = 0; t < futures.size(); t++) 
      { 
       Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS); 
       if(result==null) 
       { 
        System.out.println("Worker TimedOut:"); 
        continue; 
       } 
       else 
       { 
        try 
        { 
         if(result.isDone() && result.get()) 
         { 
          System.out.println("Worker Completed:"); 
         } 
         else 
         { 
          System.out.println("Worker Failed"); 
         } 
        } 
        catch (ExecutionException ee) 
        { 
         ee.printStackTrace(); 
        } 
       } 
      } 
     } 
     catch (InterruptedException ie) 
     { 
     } 
     finally 
     { 
      //Cancel by interrupting any existing tasks currently running in Executor Service 
      for (Future<Boolean> f : futures) 
      { 
       f.cancel(true); 
      } 
     } 
     System.out.println("Done"); 
    } 
} 

class Worker implements Callable<Boolean> 
{ 
    private int number; 
    public Worker(int number) 
    { 
     this.number=number; 
    } 

    public Boolean call() 
    { 
     if(number==3) 
     { 
      try 
      { 
       Thread.sleep(50000); 
      } 
      catch(InterruptedException tie) 
      { 

      } 
     } 
     return true; 
    } 
} 

輸出

Worker Completed: 
Worker Completed: 
Worker Completed: 
Worker Completed: 
Worker Completed: 
Worker TimedOut: 
Done 
+0

@ user294896 - 你能在一個小的,獨立的例子提供了一些示例代碼? – justkt 2011-03-10 14:02:52

+0

@justkt我可以試試,可能需要一點時間 – 2011-03-10 14:05:51

+0

爲什麼這個任務本身不能意識到它需要很長的時間和中止?這將大大簡化事情。 – trojanfoe 2011-03-10 14:10:13

回答

4

我認爲香港專業教育學院解決了這個問題,基本上如果發生超時,然後我通過未來的對象列表進行迭代,找到第一個還沒有完成,取消力。似乎並不高雅,但似乎工作。

伊夫改變池的大小隻是爲了顯示更好的演示解決方案,但與2線程池的作品,以及輸出。

import java.util.ArrayList; 
import java.util.Collection; 
import java.util.Date; 
import java.util.List; 
import java.util.concurrent.*; 

public class CompletionServiceTest 
{ 
    public static void main(final String[] args) 
    { 
     CompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(1)); 
     Collection<Worker> tasks = new ArrayList<Worker>(10); 
     tasks.add(new Worker(1)); 
     tasks.add(new Worker(2)); 
     tasks.add(new Worker(3)); 
     tasks.add(new Worker(4)); 
     tasks.add(new Worker(5)); 
     tasks.add(new Worker(6)); 

     List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size()); 
     try 
     { 
      for (Callable task : tasks) 
      { 
       futures.add(cs.submit(task)); 
      } 
      for (int t = 0; t < futures.size(); t++) 
      { 
       System.out.println("Invocation:"+t); 
       Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS); 
       if(result==null) 
       { 
        System.out.println(new Date()+":Worker Timedout:"); 
        //So lets cancel the first futures we find that havent completed 
        for(Future future:futures) 
        { 
         System.out.println("Checking future"); 
         if(future.isDone()) 
         { 
          continue; 
         } 
         else 
         { 
          future.cancel(true); 
          System.out.println("Cancelled"); 
          break; 
         } 
        } 
        continue; 
       } 
       else 
       { 
        try 
        { 
         if(result.isDone() && !result.isCancelled() && result.get()) 
         { 
          System.out.println(new Date()+":Worker Completed:"); 
         } 
         else if(result.isDone() && !result.isCancelled() && !result.get()) 
         { 
          System.out.println(new Date()+":Worker Failed"); 
         } 
        } 
        catch (ExecutionException ee) 
        { 
         ee.printStackTrace(System.out); 
        } 
       } 
      } 
     } 
     catch (InterruptedException ie) 
     { 
     } 
     finally 
     { 
      //Cancel by interrupting any existing tasks currently running in Executor Service 
      for (Future<Boolean> f : futures) 
      { 
       f.cancel(true); 
      } 
     } 
     System.out.println(new Date()+":Done"); 
    } 
} 

class Worker implements Callable<Boolean> 
{ 
    private int number; 
    public Worker(int number) 
    { 
     this.number=number; 
    } 

    public Boolean call() 
     throws InterruptedException 
    { 
     try 
     { 
      if(number==3) 
      { 
       Thread.sleep(50000); 
      } 
     } 
     catch(InterruptedException ie) 
     { 
      System.out.println("Worker Interuppted"); 
      throw ie; 
     } 
     return true; 
    } 
} 

輸出是

Invocation:0 
Thu Mar 10 20:51:39 GMT 2011:Worker Completed: 
Invocation:1 
Thu Mar 10 20:51:39 GMT 2011:Worker Completed: 
Invocation:2 
Thu Mar 10 20:51:49 GMT 2011:Worker Timedout: 
Checking future 
Checking future 
Checking future 
Cancelled 
Invocation:3 
Worker Interuppted 
Invocation:4 
Thu Mar 10 20:51:49 GMT 2011:Worker Completed: 
Invocation:5 
Thu Mar 10 20:51:49 GMT 2011:Worker Completed: 
Thu Mar 10 20:51:49 GMT 2011:Done 
2

在你的工人例子,你可贖回阻止在支持中斷通話的簡化版本。如果您的真實代碼在固有鎖定(​​塊)上發生死鎖,您將無法通過中斷取消它。相反,您可以使用顯式鎖定(java.util.concurrent.Lock),該鎖定允許您指定要等待鎖定獲取的時間。如果線程超時等待鎖定,可能是因爲它遇到了死鎖情況,則可能會因錯誤消息而中止。

順便說一句,在你的例子中,你的Callable不應該吞下InterruptedException。您應該通過它(重新拋出,或添加InterruptedException到拋出你的方法報關行),或在catch塊,重置線程的中斷狀態(與Thread.currentThread().interrupt())。

+0

工作者代碼並不真正代表真正的工人,我只是放了一些東西,以便代碼能夠運行。問題是如何處理CompletionService/ExecutorService,我不知道哪個未來正在運行/出現問題,我該如何取消似乎有問題的服務中的任務。 – 2011-03-10 17:29:26

+0

ps:你還應該在代碼中的關鍵點檢查Thread.currentThread()。isInterrupted()。除非您在阻止操作中支持它,否則通常不會引發InterruptedException。您可以使用isInterrupted()方法檢查是否發生了中斷,然後停止並清理任務。 – Matt 2012-10-02 04:04:13

1

您可以隨時撥打future.get(timeout...)
它將返回超時異常,如果它還沒有完成......然後,你可以調用future.cancel()