2016-06-26 36 views
2

我有以下組成部分:合併多個CompletableFutures

private JobInfo aggregateJobInfo() { 
    final JobsResult jobsResult = restClient().getJobs(); 
    final List<String> jobIds = extractJobIds(jobsResult); 

    //fetch details, exceptions and config for each job 
    final List<JobDetails> jobDetails = jobIds.stream().map(jobId -> { 
     final JobDetailResult jobDetailResult = restClient().getJobDetails(jobId); 
     final JobExceptionsResult jobExceptionsResult = restClient().getJobExceptions(jobId); 
     final JobConfigResult jobConfigResult = restClient().getJobConfig(jobId); 
     return new JobDetails(jobDetailResult, jobExceptionsResult, jobConfigResult); 
    }).collect(Collectors.toList()); 
    return new JobInfo(jobsResult, jobDetails); 
} 

private static List<String> extractJobIds(final JobsResult jobsResult) { 
    final ArrayList<String> jobIds = new ArrayList<>(); 
    jobIds.addAll(jobsResult.getRunning()); 
    jobIds.addAll(jobsResult.getFinished()); 
    jobIds.addAll(jobsResult.getCanceled()); 
    jobIds.addAll(jobsResult.getFailed()); 
    return jobIds; 
} 

它只是調用一些終端和aggergates一些數據。現在,我嘗試使用CompletableFutures,我並沒有真正使用過,以使該無阻塞..

private CompletableFuture<JobInfo> aggregateJobInfo() { 
    final CompletableFuture<JobsResult> jobsResultFuture = restClient().getJobs(); 
    final CompletableFuture<List<String>> jobIdsFuture = jobsResultFuture.thenApply(JobInfoCollector::extractJobIds); 

    //fetch details, exceptions and config for each job 
    final CompletableFuture<List<CompletableFuture<JobDetails>>> jobDetailsFuture = jobIdsFuture.thenApply(jobIds -> { 
     return jobIds.stream().map(jobId -> { 
      final CompletableFuture<JobDetailResult> jobDetailsResultFuture = restClient().getJobDetails(jobId); 
      final CompletableFuture<JobExceptionsResult> jobExceptionsFuture = restClient().getJobExceptions(jobId); 
      final CompletableFuture<JobConfigResult> jobConfigFuture = restClient().getJobConfig(jobId); 
      return jobDetailsResultFuture.thenCompose(jobDetailResult -> { 
       return jobExceptionsFuture.thenCombine(jobConfigFuture, (jobExceptionsResult, jobConfigResult) -> { 
        return new JobDetails(jobDetailResult, jobExceptionsResult, jobConfigResult); 
       }); 
      }); 

     }).collect(Collectors.toList()); 
    }); 
    return null; 

我的問題是如何在這裏創建CompletableFuture時JOBINFO是'新JOBINFO(jobsResult,jobDetails )?

正如我所說,我是新手,也許我的方法不好,有更好的解決方案?

讚賞任何想法,感謝

第一工作版本:

private CompletableFuture<JobInfo> aggregateJobInfo() { 

    final CompletableFuture<JobsResult> jobsResultFuture = restClient().getJobs(); 
    final CompletableFuture<List<String>> jobIdsFuture = jobsResultFuture.thenApply(JobInfoCollector::extractJobIds); 

    final CompletableFuture<List<CompletableFuture<JobDetails>>> jobDetailsFutureListFuture = 
      jobIdsFuture.thenApply(jobIds -> jobIds.stream().map(jobId -> { 
       final CompletableFuture<JobDetailResult> jobDetailsResultFuture = restClient().getJobDetails(jobId); 
       final CompletableFuture<JobExceptionsResult> jobExceptionsFuture = restClient().getJobExceptions(jobId); 
       final CompletableFuture<JobConfigResult> jobConfigFuture = restClient().getJobConfig(jobId); 
       return jobDetailsResultFuture.thenCompose(jobDetailResult -> 
         jobExceptionsFuture.thenCombine(jobConfigFuture, (jobExceptionsResult, jobConfigResult) -> 
           new JobDetails(jobDetailResult, jobExceptionsResult, jobConfigResult))); 
      }).collect(Collectors.toList())); 

    return jobDetailsFutureListFuture.thenCompose(jobDetailsFutures -> 
      CompletableFuture.allOf(jobDetailsFutures.toArray(
        new CompletableFuture[jobDetailsFutures.size()])).thenApply(aVoid -> 
        jobDetailsFutures.stream() 
          .map(CompletableFuture::join) 
          .collect(Collectors.toList()))) 
      .thenApply(jobDetails -> jobsResultFuture.thenApply(jobsResult -> 
        new JobInfo(jobsResult, jobDetails))) 
      .join(); 
} 
+1

它似乎並不像你粘貼的大部分代碼都與問題相關。你能把它縮小到你需要的最小范例嗎? – the8472

+0

問題是如何「映射」上面的未來數據,因此返回CompletableFuture 。 –

回答

4

您有:

  • CompletableFuture<JobsResult> jobsResultFuture
  • CompletableFuture<List<CompletableFuture<JobDetails>>> jobDetailsFuture
  • JobInfo(JobsResult a, List<JobDetails> b)

你想

CompletableFuture<JobInfo>

額外觀察:當jobsResultFuture完成jobDetailsFuture才能完成。通過thenApply

  • List<JobDetails> + CompletableFuture<JobsResult>>List<JobDetails>(如捕獲 -

    1. List<CompletableFuture<JobDetails>> - >通過allOfVoidthenCompose
    2. Void + List<CompletableFuture<JobDetails>>(如捕獲VAR):

      這樣你就可以實現以下var)→JobInfo via thenApply

    您可以簡單地通過get()在這些映射函數內展開期貨,因爲由於當時的祖先期貨依賴關係,期貨被保證在該點完成。

    其他方法使用thenCombine和流量減少將是可能的,但更詳細,並創造更多的中間期貨。

  • +0

    對不起,我不明白它是什麼意思,'awaitAll'和虛空? –

    +0

    表示[allOf](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#allOf-java.util.concurrent.CompletableFuture...-)。它返回一個'CompletableFuture ',因此下一步獲取數據返回 – the8472

    +0

    仍然不明白。試圖 jobDetailsListFuture.thenCompose(jobDetailsFutures - > { 返回CompletableFuture.allOf(jobDetailsFutures.toArray( 新CompletableFuture [jobDetailsFutures.size()])) .thenApply(避免 - > {???}); }); 但不知道該如何處理空白。 –