2013-04-07 36 views
4

同時這裏的基本上是我的問題:防止多個異步調用,而不阻塞

while (true) { 
    if (previous 'doWorkAsync' method is not still in flight) { 
    doWorkAsync() // this returns immediately 
    } 
    wait set amount of time 
} 

一對夫婦的解決方案浮現在腦海中對我說:

  1. 的位置,直到doWorkAsync()完成。出於幾個原因,這對我來說並不合適。 它可能導致等待的時間比我真正需要的等待時間長一些(例如,如果doWorkAsync需要5秒鐘,並且等待時間設置爲10秒,則會導致15秒在通話之間等待,這不是我想要的)。當然,我可以通過等待更少的時間來解釋這一點,但不知何故,它感覺很笨拙。 它也不必要地關聯這個線程。該線程可以處理其他工作,例如進行配置更新,以便下一次調用doWorkAsync()具有新的數據,而不是等待此任務回來。

  2. 使用門控機制。想到的最簡單的實現是一個布爾值,在調用doWorkAsync()之前設置,在doWorkAsync()完成時取消設置。這實際上是我現在正在做的,但我不確定它是否是反模式?

是#2正確的路要走,還是有更好的方法來解決這個問題?

編輯:如果有幫助,doWorkAsync()返回一個ListenableFuture(的番石榴)。

原始問題可能沒有100%清楚。癥結所在。如果異步請求在給定超時之前完成,則此代碼將始終有效。但是,如果異步任務需要SET_AMOUNT_OF_TIME + epsilon才能完成,那麼此代碼將根據需要休眠兩次,這正是我試圖避免的。

+0

以零超時加入異步工作的線程有什麼問題 – 2013-04-07 01:49:51

+0

@EliAlgranti我不想阻塞。 Thread.join()將阻塞,直到異步任務完成。 – knightry 2013-04-07 04:51:53

+0

你處於無限循環中,正在睡覺。阻塞如何成爲一個問題? – user949300 2013-04-07 06:57:43

回答

1

最簡單的方法是使用已在Java中的waitnotifyAll方法。您只需使用AtomicBoolean作爲標誌並將其阻止,直到另一個Thread告訴您某些內容已更改。

與您的方法之間的區別是阻塞的線程不會執行任何操作,而輪詢線程使用CPU時間。

下面是使用兩個Thread個簡單的例子 - RunnableFirst」提交併等待在done直到RunnableSecond」通知它改變了標誌。

public class App { 

    private static final AtomicBoolean done = new AtomicBoolean(false); 

    private static final class First implements Runnable { 

     @Override 
     public void run() { 
      while (!done.get()) { 
       System.out.println("Waiting."); 
       synchronized (done) { 
        try { 
         done.wait(); 
        } catch (InterruptedException ex) { 
         return; 
        } 
       } 
      } 
      System.out.println("Done!"); 
     } 
    } 

    private static final class Second implements Runnable { 

     @Override 
     public void run() { 
      try { 
       Thread.sleep(1000); 
      } catch (InterruptedException ex) { 
       return; 
      } 
      done.set(true); 
      synchronized (done) { 
       done.notifyAll(); 
      } 
     } 
    } 

    public static void main(String[] args) throws InterruptedException { 
     final ExecutorService executorService = Executors.newFixedThreadPool(2); 

     executorService.submit(new First()); 
     Thread.sleep(1000); 
     executorService.submit(new Second()); 
     executorService.shutdown(); 

    } 
} 

sleep電話是隻是爲了顯示任意長度的任務,可以發生,顯然他們不是必需的。

需要注意的是,First每次打印「等待」它進入循環,如果運行代碼,它只打印一次。第二件要注意的事情是,First立即響應標誌的變化,因爲它被告知在標誌改變時清醒並重新檢查。

我在InterruptedException塊中使用了return,您可能想要使用Thread.currentThread().interrupt(),以便過程在虛假中斷時不會死亡。

一種更先進的方法是使用LockCondition

public class App { 

    private static final Lock lock = new ReentrantLock(); 
    private static final Condition condition = lock.newCondition(); 

    private static final class First implements Runnable { 

     @Override 
     public void run() { 
      lock.lock(); 
      System.out.println("Waiting"); 
      try { 
       condition.await(); 
      } catch (InterruptedException ex) { 
       return; 
      } finally { 
       lock.unlock(); 
      } 
      System.out.println("Done!"); 
     } 
    } 

    private static final class Second implements Runnable { 

     @Override 
     public void run() { 
      lock.lock(); 
      try { 
       Thread.sleep(1000); 
       condition.signalAll(); 
      } catch (InterruptedException ex) { 
       return; 
      } finally { 
       lock.unlock(); 
      } 
     } 
    } 

    public static void main(String[] args) throws InterruptedException { 
     final ExecutorService executorService = Executors.newFixedThreadPool(2); 

     executorService.submit(new First()); 
     Thread.sleep(1000); 
     executorService.submit(new Second()); 
     executorService.shutdown(); 

    } 
} 

在此情況下First獲取Lock對象上的鎖立即調用awaitCondition。釋放鎖和Condition上的塊。

Second那麼獲取關於Lock的鎖定和其上醒來FirstCondition調用signalAll

First然後重新獲得鎖定並繼續執行,打印「完成!」。

EDIT

的OP想與在指定時間要調用的方法doWorkAsync,如果該方法花費較少的時間比,則週期的過程中必須等待。如果方法需要更長的時間,那麼應該立即再次調用該方法。

任務需要在一段時間後停止。

該方法不應該同時運行多次。

最簡單的方法是調用從ScheduledExecutorService的方法,該Runnable想包的方法和對Futureget - 阻斷安排執行程序,直到它完成。

這保證用至少調用WAIT_TIME_BETWEEN_CALLS_SECS延遲的方法。

然後計劃在設定時間後殺死第一個任務的另一個任務。

final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); 
final Future<?> taskHandle = scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 
    @Override 
    public void run() { 
     final ListenableFuture<Void> lf = doWorkAsync(); 
     try { 
      doWorkAsync().get(); 
     } catch (InterruptedException ex) { 
      Thread.currentThread().interrupt(); 
     } catch (ExecutionException ex) { 
      throw new RuntimeException(ex); 
     } 
    } 
}, 0, WAIT_TIME_BETWEEN_CALLS_SECS, TimeUnit.SECONDS); 
scheduledExecutorService.schedule(new Runnable() { 
    @Override 
    public void run() { 
     taskHandle.cancel(false); 
    } 
}, TOTAL_TIME_SECS, TimeUnit.SECONDS); 

最好的解決辦法是調用一個ScheduledExecutorService原始Runnable,而不是調用它在另一個執行人及上阻塞ListenableFuture

+0

我不是這些方法的巨大粉絲,因爲他們需要異步任務知道調用者(他們都使用全局可用的標誌)。我更喜歡調用者執行所有邏輯的方法,並且異步任務可以愉快地啓動異步任務並返回可監聽的聲音。 – knightry 2013-04-07 04:42:58

+0

你總是可以將你以前的'doWorkAsync'方法包裝在'WaitingRunnable'中,或者這樣做,以便在那裏完成阻塞。該標誌不必是全局的,它可以傳遞給'Runnables'。就我所見,您提出的解決方案仍然使用'Thread.sleep' - 這總是**一個糟糕的主意並且代表了代碼異味。該解決方案不會睡眠任意長度的時間,但會阻塞,直到任務完成的時刻。 – 2013-04-07 10:08:16

+0

如何確保我至少能夠等待所需的時間,而無需睡眠?異步任務可能比我允許再次調用該函數之前的時間快得多。 – knightry 2013-04-07 19:48:52

0

想想你要找的是The Reactor Pattern

你有什麼理由不希望這些東西同時運行嗎?如果你想要做的是鏈接它們,你可以使用期貨。 Akka有可堆肥期貨和可映射的期貨。

+0

反應堆模式看起來與我之後的不同。我只有一個未來獲得回報,我需要關注,而不是多個投入來應對。 此外,不知道我理解你的問題。你指的是哪些'東西'?你能詳細說明我可以如何使用期貨來獲得等待設定時間的期望行爲嗎? – knightry 2013-04-07 04:56:18

+0

當你添加睡覺,你阻止了線程。 – Rob 2013-04-08 01:07:07

+0

我提起了Reactor,因爲我以爲你是在試圖阻止線程互相侵入。或者你試圖鏈接,這就是爲什麼我說或者。期貨被鏈接,發生的事情是,當第一個完成時,第二個被啓動,並且沒有睡覺或等待。 – Rob 2013-04-08 01:09:24