2017-10-11 75 views
0

我有以下工作代碼:加入多個回調執行在CompletableFuture

DiscoveryCallback callback = new DiscoveryCallback(); 
Manager.discover(someparam, callback); 

我想這個調用包裝成一個CompletableFuture有RX-ISH API與其他異步操作組成。

Manager.discover()是一個第三方庫,實際上是一個用於本機功能結合的方法,並且它多次執行回調,在不同的線程。

我DiscoveryCallback實現以下接口:

interface onFoundListerner { 
    onFound(List<Result> results) 
    onError(Throwable error) 
} 

我試圖注入的CompletableFuture<List<Result>>一個實例爲DiscoveryCallback,然後調用完成方法。對於一個回調執行它可以正常工作,而其他回調則被忽略。

我怎樣才能加入這個多次執行的結果,使我的包裝返回單個CompletableFuture?

+0

也許一個'Iterator'或'Stream'將比'CompletableFuture's更合適。 – acelent

+0

這樣我的包裝將被阻止。我需要客戶端上的反應式API,但我不能依賴RxJava。順便說一句,Observables一切都按預期工作。 – thiagogcm

回答

0

什麼異步隊列?

public class AsyncQueue<T> { 
    private final Object lock = new Object(); 
    private final Queue<T> queue = new ArrayDeque<T>(); 
    private CompletableFuture<Void> removeCf = new CompletableFuture<>(); 

    public void add(T item) { 
     synchronized (lock) { 
      queue.add(item); 
      removeCf.complete(null); 
     } 
    } 

    public CompletableFuture<T> removeAsync() { 
     CompletableFuture<Void> currentCf = null; 
     synchronized (lock) { 
      T item = queue.poll(); 
      if (item != null) { 
       return CompletableFuture.completedFuture(item); 
      } 
      else { 
       if (removeCf.isDone()) { 
        removeCf = new CompletableFuture<>(); 
       } 
       currentCf = removeCf; 
      } 
     } 
     return currentCf 
      .thenCompose(v -> removeAsync()); 
    } 
} 

在Java 9,你可以使用.completeOnTimeout(null, timeout, unit)removeAsync返回的CompletableFuture有超時機制。

的Java 9之前,你需要安排自己的超時。下面是使用嵌入式超時調度版本:

public class AsyncQueue<T> { 
    static final ScheduledExecutorService scheduledExecutorService; 

    static { 
     ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, new ScheduledThreadFactory()); 
     scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true); 
     scheduledExecutorService = Executors.unconfigurableScheduledExecutorService(scheduledThreadPoolExecutor); 
    } 

    static final class ScheduledThreadFactory implements ThreadFactory { 
     static AtomicInteger scheduledExecutorThreadId = new AtomicInteger(0); 

     static final synchronized int nextScheduledExecutorThreadId() { 
      return scheduledExecutorThreadId.incrementAndGet(); 
     } 

     @Override 
     public Thread newThread(Runnable runnable) { 
      Thread thread = new Thread(runnable, "AsynchronousSemaphoreScheduler-" + nextScheduledExecutorThreadId()); 
      thread.setDaemon(true); 
      return thread; 
     } 
    } 

    private final Object lock = new Object(); 
    private final Queue<T> queue = new ArrayDeque<T>(); 
    private CompletableFuture<Long> removeCf = new CompletableFuture<>(); 

    public void add(T item) { 
     synchronized (lock) { 
      queue.add(item); 
      removeCf.complete(System.nanoTime()); 
     } 
    } 

    public CompletableFuture<T> removeAsync(long timeout, TimeUnit unit) { 
     if (unit == null) throw new NullPointerException("unit"); 

     CompletableFuture<Long> currentCf = null; 
     synchronized (lock) { 
      T item = queue.poll(); 
      if (item != null) { 
       return CompletableFuture.completedFuture(item); 
      } 
      else if (timeout <= 0L) { 
       return CompletableFuture.completedFuture(null); 
      } 
      else { 
       if (removeCf.isDone()) { 
        removeCf = new CompletableFuture<>(); 
       } 
       currentCf = removeCf; 
      } 
     } 
     long startTime = System.nanoTime(); 
     long nanosTimeout = unit.toNanos(timeout); 
     CompletableFuture<T> itemCf = currentCf 
      .thenCompose(endTime -> { 
       long leftNanosTimeout = nanosTimeout - (endTime - startTime); 
       return removeAsync(leftNanosTimeout, TimeUnit.NANOSECONDS); 
      }); 
     ScheduledFuture<?> scheduledFuture = scheduledExecutorService 
      .schedule(() -> itemCf.complete(null), timeout, unit); 
     itemCf 
      .thenRun(() -> scheduledFuture.cancel(true)); 
     return itemCf; 
    } 

    public CompletableFuture<T> removeAsync() { 
     CompletableFuture<Long> currentCf = null; 
     synchronized (lock) { 
      T item = queue.poll(); 
      if (item != null) { 
       return CompletableFuture.completedFuture(item); 
      } 
      else { 
       if (removeCf.isDone()) { 
        removeCf = new CompletableFuture<>(); 
       } 
       currentCf = removeCf; 
      } 
     } 
     return currentCf 
      .thenCompose(endTime -> removeAsync()); 
    } 
} 

您可以重構調度出這個類與其他類共享,也許成採用工廠在.properties文件建立並訴諸於一個單該示例中的默認值如果未配置。

您可以使用ReentrantLock而不是​​語句來獲得這一點性能。它應該只在重大的爭論中才重要,但AsyncQueue<T>可以用於這種目的。

相關問題