使用Spring Integration Java DSL,我構建了一個流程,我正在與FileSplitter
同步處理文件。我已經能夠使用上AbstractFilePayloadTransformer
的setDeleteFiles
標誌要刪除的文件轉換每一行File
爲Message
進行後續處理之後,像這樣:Spring集成Java DSL流程Splitter/Aggregator在處理完所有行後刪除文件
@Bean
protected IntegrationFlow s3ChannelFlow() {
// do not exhaust filesystem w/ files downloaded from S3
FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
transformer.setDeleteFiles(true);
// @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
// @formatter:off
return IntegrationFlows
.from(s3Channel())
.channel(StatsUtil.createRunStatsChannel(runStatsRepository))
.transform(transformer)
.split(new FileSplitter())
.transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
.publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
.get();
// @formatter:on
}
這工作得很好,但速度很慢。所以我嘗試上述.split
後添加一個ExecutorChannel
,像這樣:
.channel(c -> c.executor(Executors.newFixedThreadPool(10)))
但隨後上述刪除標誌不允許流成功完成刪除文件(S)之前,他們完全讀取。
如果我刪除標誌,我有可能耗盡本地文件系統,其中文件是從S3同步的。
我可以在上面介紹a)完全處理每個文件和b)從本地文件系統中刪除文件一旦完成?換句話說,是否有一種方法可以準確知道文件何時完全處理(何時線已通過池中的線程異步處理)?
如果你很好奇,這裏是我的FileToInputStreamTransformer
IMPL:
public class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {
private static final int BUFFER_SIZE = 64 * 1024; // 64 kB
@Override
// @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/
protected InputStream transformFile(File payload) throws Exception {
return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE);
}
}
UPDATE
那麼,如何在東西下游流動知道要問什麼?
順便說一句,如果我正確地按照你的意見,當我.split
與new FileSplitter(true, true)
更新上面,我得到
2015-10-20 14:26:45,288 [pool-6-thread-1] org.springframework.integration.handler.LoggingHandler ERROR org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is java.lang.IllegalArgumentException: 'json' argument must be an instance of: [class java.lang.String, class [B, class java.io.File, class java.net.URL, class java.io.InputStream, class java.io.Reader] , but gotten: class org.springframework.integration.file.splitter.FileSplitter$FileMarker at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44)
請參閱上面的我的更新。我該如何繼續? –
考慮在一側使用'aggregator'使用'PayloadTypeRouter'或'publishSubscribeChannel',在另一側使用'transform()'前使用'filter'。 –