2011-02-10 17 views
9

我有一個要求任務異步執行,同時放棄任何進一步的請求,直到任務完成。單線程的任務,而不排隊進一步的請求

同步該方法只是排隊的任務,不跳過。我最初認爲要使用SingleThreadExecutor,但它也排隊完成任務。然後,我查看了ThreadPoolExecutor,但它讀取隊列以獲取要執行的任務,因此將執行一個任務,並且排隊最少一個任務(其他人可以使用ThreadPoolExecutor.DiscardPolicy放棄)。

我唯一能想到的就是使用Semaphore來阻塞隊列。我用下面的例子來展示我想要達到的目標。有一種更簡單的方法嗎?我錯過了明顯的東西嗎?

import java.util.concurrent.*; 

public class ThreadPoolTester { 
    private static ExecutorService executor = Executors.newSingleThreadExecutor(); 
    private static Semaphore processEntry = new Semaphore(1); 

    public static void main(String[] args) throws InterruptedException { 
     for (int i = 0; i < 20; i++) { 
      kickOffEntry(i); 

      Thread.sleep(200); 
     } 

     executor.shutdown(); 
    } 

    private static void kickOffEntry(final int index) { 
     if (!processEntry.tryAcquire()) return; 
     executor. 
      submit(
       new Callable<Void>() { 
        public Void call() throws InterruptedException { 
         try { 
          System.out.println("start " + index); 
          Thread.sleep(1000); // pretend to do work 
          System.out.println("stop " + index); 
          return null; 

         } finally { 
          processEntry.release(); 
         } 
        } 
       } 
      ); 
    } 
} 

樣本輸出

start 0 
stop 0 
start 5 
stop 5 
start 10 
stop 10 
start 15 
stop 15 

以axtavt答案,並轉換上述例子給出了以下簡單的解決方案。

import java.util.concurrent.*; 

public class SyncQueueTester { 
    private static ExecutorService executor = new ThreadPoolExecutor(1, 1, 
      1000, TimeUnit.SECONDS, 
      new SynchronousQueue<Runnable>(), 
      new ThreadPoolExecutor.DiscardPolicy()); 

    public static void main(String[] args) throws InterruptedException { 
     for (int i = 0; i < 20; i++) { 
      kickOffEntry(i); 

      Thread.sleep(200); 
     } 

     executor.shutdown(); 
    } 

    private static void kickOffEntry(final int index) { 
     executor. 
      submit(
       new Callable<Void>() { 
        public Void call() throws InterruptedException { 
         System.out.println("start " + index); 
         Thread.sleep(1000); // pretend to do work 
         System.out.println("stop " + index); 
         return null; 
        } 
       } 
      ); 
    } 
} 

回答

10

它看起來像由SynchronousQueue與預期的政策支持執行你想要做什麼:

executor = new ThreadPoolExecutor(
    1, 1, 
    1000, TimeUnit.SECONDS, 
    new SynchronousQueue<Runnable>(), 
    new ThreadPoolExecutor.DiscardPolicy()); 
+0

Coudl你告訴我,爲什麼你喜歡使用一個傳播信號參考其他線程的堆棧DiscardPolicy捕獲RejectedExecutionException? – 2012-10-16 01:06:11

0

如果沒有排隊,沒有必要爲遺囑執行人我說。單獨使用信號量似乎就夠了。我使用下面的代碼來避免在運行時運行相同的代碼。只需確保semaphorestatic volatile,這使得信號的唯一信號量的類,並儘快將其改變

if (this.getSemaphore().tryAcquire()) { 
     try { 
      process(); 
     } catch (Exception e) { 
     } finally { 
      this.getSemaphore().release(); 
     } 
} 
else { 
    logger.info(">>>>> Job already running, skipping go"); 
} 
+0

該任務需要在單獨的線程中運行,以便主線程不被阻塞或受到其他影響(「主線」實際上是一個swing應用程序) – 2011-02-10 08:57:21