0
我正在開發一個Spring Integration的文件解析系統。Spring集成:Scatter-Gather模式實現
該場景是下一個: 輪詢器從目錄中提取文件,然後我們應用一些轉換,解析等......最後,我們需要將數據存儲到兩個源(mongoDB和文件系統)中,並刪除原始文件。
我正在使用Scatter-Gather模式並行執行存儲操作。
問題:如何自動釋放組?並將彙總的結果傳遞給最終處理程序?
下面是一個代碼示例:
@Bean
public IntegrationFlow processDomainFileFlow() {
return IntegrationFlows
.from("receiverChannel")
.scatterGather(scatterer -> scatterer
.recipientFlow(m -> true, subFlow -> subFlow.handle(new DataToMongoHandler()))
.recipientFlow(m -> true, subFlow -> subFlow.handle(new DataToFileStorageHandler())),
gatherer -> gatherer
.releaseStrategy(group -> group.size() == 2),
scatterGatherSpec -> scatterGatherSpec
.gatherChannel(MessageChannels.direct("gateway").get()))
.get();
}
,這裏是一個最終處理程序:
@Bean
public IntegrationFlow gatewayFlow(){
return IntegrationFlows.from("gateway")
.handle(new DeleteOriginalFileHandler())
.get();
}
嘿加里,感謝您的意見。我瞭解原理,但無法弄清細節。我已經使用分散聚集方法更新了我的問題。發佈策略不適用。我的雙方都是無效的。我應該手動填充標題嗎?我也無法弄清楚如何將聚合結果重定向到最終處理程序。 –
由於處理程序返回void,因此需要一些其他機制來生成結果 - 爲子流添加一對pub/sub通道,第二個訂閱者返回結果。如果處理程序失敗,則不會調用第二個用戶(默認情況下)。您可以向聚合器(收集器)添加超時。 –