這是我早期問題Spring Integration File reading的延續。 總之,我有一個fileIn通道(一個隊列),然後是一個處理文件的ServiceActivator和一個用於保存文件的出站適配器。ServiceActivators的Spring Integration併發性
我想引入併發性來處理多線程中的消息。我使用java DSL(但不是Java8)。我能夠用下面的方式做到這一點...
@Bean
public MessageChannel fileInChannel() {
return MessageChannels.queue("fileIn").get();
}
@Bean
public IntegrationFlow fileProcessingFlow() {
return IntegrationFlows.from(fileInChannel())
.handle(myFileProcessor, "processFile",
new Consumer<GenericEndpointSpec<ServiceActivatingHandler>>() {
@Override
public void accept(GenericEndpointSpec<ServiceActivatingHandler> t) {
t.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1).taskExecutor(Executors.newCachedThreadPool()));
}
})
.handle(Files.outboundAdapter(new File(outDir)).autoCreateDirectory(true).get())
.get();
}
這工作!我也試過以下
public IntegrationFlow fileProcessingFlow() {
return IntegrationFlows.from(fileInChannel())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.handle(myFileProcessor)
.handle(Files.outboundAdapter(new File(outDir)).autoCreateDirectory(true).get())
.get();
}
這也行!我不知道它只是一種風格,或者一種方法比另一種更好。如果是這樣,哪種方法更好。其次,在上述情況下,「文件寫入」(即最後一步)是順序的還是將在不同的線程中工作。如果我還需要併發性,我應該在句柄(fileProcessor)和句柄(outBoundAdapter)之間引入另一個taskExecutor通道嗎? 最終outboundadapter將是一個遠程文件S3適配器,因此問題