0

我正在玩Java 8中新引入的併發功能,來自Cay S. Horstmann的書籍「Java SE 8 for the Really Impatient」的練習。我使用新的CompletedFuturejsoup創建了以下網絡爬蟲。其基本思想是給出一個URL,它會在該頁面上首先找到m URL,並重復該過程n次。 m and n當然是參數。問題是程序獲取初始頁面的URL,但不會遞歸。我錯過了什麼?Java 8 CompletedFuture網絡爬蟲不爬行一個URL

static class WebCrawler { 
    CompletableFuture<Void> crawl(final String startingUrl, 
     final int depth, final int breadth) { 
     if (depth <= 0) { 
      return completedFuture(startingUrl, depth); 
     } 

     final CompletableFuture<Void> allDoneFuture = allOf((CompletableFuture[]) of(
      startingUrl) 
      .map(url -> supplyAsync(getContent(url))) 
      .map(docFuture -> docFuture.thenApply(getURLs(breadth))) 
      .map(urlsFuture -> urlsFuture.thenApply(doForEach(
       depth, breadth))) 
      .toArray(size -> new CompletableFuture[size])); 

     allDoneFuture.join(); 

     return allDoneFuture; 
    } 

    private CompletableFuture<Void> completedFuture(
     final String startingUrl, final int depth) { 
     LOGGER.info("Link: {}, depth: {}.", startingUrl, depth); 

     CompletableFuture<Void> future = new CompletableFuture<>(); 
     future.complete(null); 

     return future; 
    } 

    private Supplier<Document> getContent(final String url) { 
     return() -> { 
      try { 
       return connect(url).get(); 
      } catch (IOException e) { 
       throw new UncheckedIOException(
        " Something went wrong trying to fetch the contents of the URL: " 
         + url, e); 
      } 
     }; 
    } 

    private Function<Document, Set<String>> getURLs(final int limit) { 
     return doc -> { 
      LOGGER.info("Getting URLs for document: {}.", doc.baseUri()); 

      return doc.select("a[href]").stream() 
       .map(link -> link.attr("abs:href")).limit(limit) 
       .peek(LOGGER::info).collect(toSet()); 
     }; 
    } 

    private Function<Set<String>, Stream<CompletableFuture<Void>>> doForEach(
      final int depth, final int breadth) { 
     return urls -> urls.stream().map(
      url -> crawl(url, depth - 1, breadth)); 
    } 
} 

測試用例:

@Test 
public void testCrawl() { 
    new WebCrawler().crawl(
     "http://en.wikipedia.org/wiki/Java_%28programming_language%29", 
     2, 10); 
} 
+0

什麼是'allOf'和'of'在'的allOf((CompletableFuture [])(startingUrl)' –

+0

什麼?是'Document'?請發佈一個可重複的例子 –

+0

@SotiriosDelimanolis這是工作代碼;'allOf'和'of'是靜態導入;'Document'是一個'jsoup'類,我不想讓這個帖子混亂下面是[代碼](https://github.com/abhijitsarkar/java/blob/master/java8-impatient/src/main/java/name/abhijitsarkar/java/java8impatient/concurrency/PracticeQuestionsCh6.java) –

回答

2

的問題是在下面的代碼:

final CompletableFuture<Void> allDoneFuture = allOf(
    (CompletableFuture[]) of(startingUrl) 
    .map(url -> supplyAsync(getContent(url))) 
    .map(docFuture -> docFuture.thenApply(getURLs(breadth))) 
    .map(urlsFuture -> urlsFuture.thenApply(doForEach(depth, breadth))) 
    .toArray(size -> new CompletableFuture[size])); 

出於某種原因,你正在做的這一切一個元素的流內(是,練習的一部分?)。結果是allDoneFuture沒有跟蹤子任務的完成情況。它正在跟蹤來自doForEachStream<CompletableFuture>的完成情況。但是那個流已經準備就緒,其內部的期貨從未被要求完成。

通過刪除不執行任何有用的流修正:

final CompletableFuture<Void> allDoneFuture=supplyAsync(getContent(startingUrl)) 
    .thenApply(getURLs(breadth)) 
    .thenApply(doForEach(depth,breadth)) 
    .thenApply(futures -> futures.toArray(CompletableFuture[]::new)) 
    .thenCompose(CompletableFuture::allOf); 
+0

謝謝。我找出了問題,並以稍微不同的方式解決了問題。我會接受你的答案,因爲它更合理。不幸的是,評論中的代碼看起來很醜。 '(startingUrl).map(url - > supplyAsync(getContent(url)))。map(docFuture - > docFuture.thenApply(getURLs(breadth)))。map(urlsFuture - > urlsFuture.thenAccept(doForEach(depth,width )))的FindFirst()\t \t \t \t \t .orElseThrow(completionException( 「爬行URL時出現錯誤:」。+ startingUrl))加入();' –

+0

一些更多的修改,以支持上面也做了我會顯示但不是評論。順便說一句,在你的回答中,'thenCompose'可以改爲'thenAccept',我相信這更合適。他們在這種情況下工作是一樣的。 –