2017-09-14 70 views
0

我正在開發一個Spring Integration的文件解析系統。Spring集成:Scatter-Gather模式實現

該場景是下一個: 輪詢器從目錄中提取文件,然後我們應用一些轉換,解析等......最後,我們需要將數據存儲到兩個源(mongoDB和文件系統)中,並刪除原始文件。

我正在使用Scatter-Gather模式並行執行存儲操作。

問題:如何自動釋放組?並將彙總的結果傳遞給最終處理程序?

下面是一個代碼示例:

@Bean 
public IntegrationFlow processDomainFileFlow() { 
    return IntegrationFlows 
      .from("receiverChannel") 
      .scatterGather(scatterer -> scatterer 
           .recipientFlow(m -> true, subFlow -> subFlow.handle(new DataToMongoHandler())) 
           .recipientFlow(m -> true, subFlow -> subFlow.handle(new DataToFileStorageHandler())), 
         gatherer -> gatherer 
           .releaseStrategy(group -> group.size() == 2), 
        scatterGatherSpec -> scatterGatherSpec 
          .gatherChannel(MessageChannels.direct("gateway").get())) 
      .get(); 
} 

,這裏是一個最終處理程序:

@Bean 
public IntegrationFlow gatewayFlow(){ 
    return IntegrationFlows.from("gateway") 
      .handle(new DeleteOriginalFileHandler()) 
      .get(); 
} 

回答

0

添加aggregator(),將其配置爲釋放所述組時的大小爲2;在成功後更改子流以向聚合器發送消息。

+0

嘿加里,感謝您的意見。我瞭解原理,但無法弄清細節。我已經使用分散聚集方法更新了我的問題。發佈策略不適用。我的雙方都是無效的。我應該手動填充標題嗎?我也無法弄清楚如何將聚合結果重定向到最終處理程序。 –

+0

由於處理程序返回void,因此需要一些其他機制來生成結果 - 爲子流添加一對pub/sub通道,第二個訂閱者返回結果。如果處理程序失敗,則不會調用第二個用戶(默認情況下)。您可以向聚合器(收集器)添加超時。 –