2017-06-26 43 views
-3

我用下面的用例切割我的牙齒上的駱駝:Apache的駱駝:使用自定義的處理器,分離器和聚合路由無法正常輸出

給定一個GitHub的用戶名,我希望獲取一定的公開的 回購以活動的降序排列,然後爲每個回購我想要 取一定數量的提交,最後,對於每次提交,我想要打印一些 信息。

爲了達到這個目的,我寫了一個Producer和下面的路線。生產者工作(我測試),沒有聚合器的路由也是如此。當使用聚合器時,什麼都沒有出現(我的測試失敗)。

public void configure() throws Exception { 
    from("direct:start") 
      .id("gitHubRoute") 
      .filter(and(
        isNotNull(simple("${header." + ENDPOINT + "}")), 
        isNotNull(simple("${body}"))) 
      ) 
      .setHeader(USERNAME, simple("${body}")) 
      .toD("github:repos?username=${body}") 
      .process(e -> { 
       // some processing 
      }) 
      .split(body()) 
      .parallelProcessing() 
      .setHeader(REPO, simple("${body.name}")) 
      .toD("github:commits" + 
        "?repo=${body.name}" + 
        "&username=${header." + USERNAME + "}" 
      ) 
      .process(e -> { 
       // some processing 
      }) 
      .split(body()) 
      .toD("github:commit" + 
        "?repo=${header." + REPO + "}" + 
        "&username=${header." + USERNAME + "}" + 
        "&sha=${body.sha}" 
      ) 
      .process(e -> { 
       // some processing 
      }) 
      .aggregate(header(REPO), new GroupedExchangeAggregationStrategy()).completionTimeout(10000l) 
      .toD("${header." + ENDPOINT + "}"); 

    from("direct:end") 
      .process().exchange(this::print); 
} 

在測試過程中,我ENDPOINT設置頭mock:result。實際上,它設置爲direct:end

我在做什麼錯?沒有錯誤,但print方法或測試期間的模擬從不被調用。

+1

在我看來,這條路線已經是一個測試太複雜了。把它分成2-3條更小的路線是不是更好?這些路線更容易測試每一步,並知道哪裏發生了什麼事情? –

+0

研究聚合體如何在更多方面起作用。我沒有時間在這裏解釋,但例如駱駝網站,書籍等。總之,聚合器是兩條腿,所以它的輸出獨立於輸入運行。 –

+2

我假設問題可能以您指示聚合完成的方式進行。使用completionInterval而不是completionTimeout來嘗試。你也可以使用completionSize。還有其他幾個選項可用,有關更多信息,請參閱http://camel.apache.org/aggregator2.html「關於完成」部分。 –

回答

0

我自己解決了。幾件我想改變的東西:

  1. 完成檢查:我使用了一個completionPredicate如下所示。
  2. eagerCheckCompletion():沒有這個,交換到completionPredicate是彙總交換,而不是傳入交換。

我也藉此機會做了很少的重構來提高可讀性。

public void configure() throws Exception { 
    from("direct:start") 
      .id("usersRoute") 
      .filter(isNotNull(simple("${header." + ENDPOINT + "}"))) 
      .setHeader(USERNAME, simple("${body}")) 
      .toD("github:users/${body}/repos") 
      .process(e -> this.<GitHub.Repository>limitList(e)) 
      .to("direct:reposRoute1"); 

    from("direct:reposRoute1") 
      .id("reposRoute1") 
      .split(body()) 
      .parallelProcessing() 
      .setHeader(REPO, simple("${body.name}")) 
      .toD("github:repos/${header." + USERNAME + "}" + "/${body.name}/commits") 
      .process(e -> this.<GitHub.Commit>limitList(e)) 
      .to("direct:reposRoute2"); 

    from("direct:reposRoute2") 
      .id("reposRoute2") 
      .split(body()) 
      .toD("github:repos/${header." + USERNAME + "}" + "/${header." + REPO + "}" + "/commits/${body.sha}") 
      .process(e -> { 
       GitHub.Commit commit = e.getIn().getBody(GitHub.Commit.class); 

       List<GitHub.Commit.File> files = commit.getFiles(); 
       if (!CollectionUtils.isEmpty(files) && files.size() > LIMIT) { 
        commit.setFiles(files.subList(0, LIMIT)); 
        e.getIn().setBody(commit); 
       } 
      }) 
      // http://camel.apache.org/aggregator2.html 
      .aggregate(header(REPO), new AggregateByRepoStrategy()) 
      .forceCompletionOnStop() 
      .eagerCheckCompletion() 
      .completionPredicate(header("CamelSplitComplete").convertTo(Boolean.class).isEqualTo(TRUE)) 
      .toD("${header." + ENDPOINT + "}"); 

    from("direct:end") 
      .process().exchange(this::print); 
} 

我作爲AggregationStrategy如下:

private static final class AggregateByRepoStrategy extends AbstractListAggregationStrategy<GitHub.Commit> { 
    @Override 
    public GitHub.Commit getValue(Exchange exchange) { 
     return exchange.getIn().getBody(GitHub.Commit.class); 
    } 
}