2014-12-13 23 views
0

我是做的Java CompletableFuture.anyOf(manyfutures).thenRun(可運行)只運行一次

CompletableFuture.anyOf(manyfutures).thenRun(new Runnable() { } } 

但在可運行的代碼只運行一次!每次期貨交易完成時,我都期待它運行多次。

如何在任何期貨的每次交易完成時運行一段代碼?在最佳的方式,這意味着該不會做:

public static void append(final CompletableFuture[] futures, Runnable runnable) { 
      for (CompletableFuture future : futures) { 
        future.thenRun(runnable); 
      } 
    } 

編輯

我使用,我想追加更多的工作時,可運行的X數量已經執行到的ThreadPoolExecutor。

有沒有辦法聽到這個,並提供更多的工作,當發生?

另一種方法是,我在開始時堆疊了數千個工作,但這也不是最優的。

我做

... queue = new LinkedBlockingQueue<Runnable>(); 
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, queue); 
+0

@Holger那麼,我有一個100萬行的數據庫,我提取並做異步工作。而不是獲取100萬行,我獲取500並有20個線程。真的,我的問題更多的是要了解這20個線程何時在500個數據庫中的70%上工作,以便從數據庫獲取更多數據,但所有線程實際上都處於忙碌狀態。我認爲我的編輯有點不正確。現在有道理嗎? :)我已經在下面提供了我的答案。 – momomo 2014-12-16 16:03:29

+0

這更有意義。但是,使用具有明確限制而非無限制LinkedBlockingQueue的'ArrayBlockingQueue'將是更簡單的解決方案。 – Holger 2014-12-17 09:50:57

回答

1

這裏是我跟蹤的這個答案,這個方法需要幾個參數,便於學習:

/** Basically creates to-start number of futures in a while loop, while passing the index to a Lambda that is passed and that returns a Runnable which will have access to the index. See example below. **/ 
    public static CompletableFuture[] async(ExecutorService executorService, int start, int to, Runnable beforeAll, Lambda.R1<Runnable, Integer> onEach, Double onPercentage, Runnable onPercentageRun, Runnable afterAll) { 
      CompletableFuture[] futures = new CompletableFuture[to-start]; 

      double  onPercentageIndex = Valid.elvis(onPercentage, 0.0) * futures.length; // When to onPercentageRun 
      AtomicBoolean percentageMet  = new AtomicBoolean (false); 
      AtomicBoolean completeMet  = new AtomicBoolean (false); 
      AtomicInteger complete   = new AtomicInteger (0 ); 

      int i = start; 
      if (i < to && beforeAll != null) { 
        beforeAll.run(); 
      } 
      boolean percentageSet = onPercentageIndex > 0.0 && onPercentageRun != null; 
      boolean completeSet = afterAll != null; 
      while(i < to) { 

        Runnable call = onEach.call(i); 
        futures[i-start] = CompletableFuture.runAsync(

          () -> { 
            try { 
              call.run(); 
            } catch (Throwable e) { 
              $Log.info(Concurrency.class, "RunAsync: run", e); 
            } 

            if (percentageSet || completeSet) { 
              complete.incrementAndGet(); 

              if (percentageSet && !percentageMet.get() && complete.get() >= onPercentageIndex) { 
                percentageMet.set(true); 

                try { 
                  onPercentageRun.run(); 
                } 
                catch(Throwable e) { 
                  $Log.info(Concurrency.class, "RunAsync: onPercentage", e); 
                } 
              } 

              if (completeSet && !completeMet.get() && complete.get() == to) { 
                completeMet.set(true); // Just for clarity, propably redundant 

                try { 
                  afterAll.run(); 
                } 
                catch(Throwable e) { 
                  $Log.info(Concurrency.class, "RunAsync: onComplete", e); 
                } 

              } 
            } 

          }, 

          executorService 
        ); 

        ++i; 
      } 

      return futures; 
    } 

爲什麼Lambda參考。R 1是,看到這種情況,Lambda interfaces

的方法,可以這樣使用:

private void recursivelyPopulateDataFiles(long fromId) { 

      List<Localfile> unproccessed = DB.fetchAllFilesFromId(fromId, limit); 

      if (unproccessed.size() > 0) { 

        Concurrency.async(
          THE_EXECUTOR, 

          0, 

          unproccessed.size(), 

          ITERATIONS_COUNTER_IN_PROGRESS_CAN_BE_USED_BY_OTHERS_TO_LISTEN_GLOBALLY_FOR_WHEN_IT_HITS_ZERO_AGAIN::incrementAndGet, 

          (Integer index) ->() -> { 
            populateDataFile(unproccessed.get(index)); 
          }, 

          0.5, 

          () -> { 
            recursivelyPopulateDataFiles(unproccessed.get(unproccessed.size() - 1).getId()); 

          }, 

          ITERATIONS_COUNTER_IN_PROGRESS_CAN_BE_USED_BY_OTHERS_TO_LISTEN_GLOBALLY_FOR_WHEN_IT_HITS_ZERO_AGAIN::decrementAndGet 
        ); 

      } 

    } 

當unproccessed大小的0.5 = 50%完成後,該塊之後執行將更多的上的執行程序。

4

看那JavaDoc

返回一個新CompletableFuture時給出任何 的CompletableFutures完整的,具有相同的結果結束。否則,如果 它異常完成,則返回的CompletableFuture也會執行 ,因此使用CompletionException作爲其原因。如果提供 否CompletableFutures,則返回不完整 CompletableFuture

該方法返回當給定CompletableFutures完整的任何。即第一個。一個方法不能多次返回。

要執行的操作CompletableFuture每個後,只需撥打theRunthenRunAsync每個CompletableFuture

如果你有一個List<CompletableFuture<T>>,你想一個CompletableFuture<List<T>>,即要「解包」期貨的集合到一個集合的將來,你可以使用這一招:

private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) { 
    final CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); 
    return allDoneFuture.thenApply(v -> 
        futures.stream(). 
          map(future -> future.join()). 
          collect(toList()) 
    ); 
} 

this article on the usage of CompletableFuture

兩者
+0

好的,我做了,我也注意到了這種行爲。那我該怎麼做呢?優選地以最佳方式。 – momomo 2014-12-13 19:03:01

+0

所以我必須循環然後追加它。我希望不必這樣做。我想要的是運行一些東西,例如其中70%已經完成。 – momomo 2014-12-13 19:06:30

+2

您最好使用['ExecutorCompletionService'](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.htmlhttps://docs.oracle.com/ JavaSE的/ 7 /文檔/ API/JAVA/util的/並行/ ExecutorCompletionService.html)。 – 2014-12-13 19:09:45