我想一旦該批次已成功處理使用Spring的集成和Java DSL的文件到遠程SFTP移動文件移動處理遠程S(FTP)文件。使用Java DSL
什麼會實現,最好的方法是什麼?
- 添加批量一個腳印地向前遠程文件?
- 或者使用FTP出站網關,並提供mv命令?
我傾向於更喜歡第二種解決方案,並讓批處理專注於邏輯,但我很難嘗試用java dsl實現它。
我讀過http://docs.spring.io/spring-integration/reference/html/ftp.html#ftp-outbound-gateway,並試圖實現這樣的:
@Bean
public MessageHandler ftpOutboundGateway() {
return Sftp.outboundGateway(SftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.MV, "payload")
.localDirectory(new File("/home/blabla/"))
.get();
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(
Sftp.inboundAdapter(SftpSessionFactory())
.regexFilter(".*\\.xml.mini$")
...
,
e -> e.id("sftpInboundAdapter")
.poller(
Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES)
.maxMessagesPerPoll(-1)
.advice(retryAdvice())
)
)
.enrichHeaders(h -> h
.header(FileHeaders.REMOTE_DIRECTORY,"/home/filedrop/")
.header(FileHeaders.REMOTE_FILE, "/home/filedrop/OFFERS.xml.mini")
.header(FileHeaders.RENAME_TO, "/home/filedrop/done/OFFERS.xml.mini")
)
.transform(fileToJobLaunchRequestTransformer())
.handle(jobLaunchingGw()))
.transform(jobExecutionToFileStringTransformer())
.handle(ftpOutboundGateway())
.handle(logger())
.get();
}
我知道我的標題應該是動態的,但我不知道該怎麼做,所以現在我使用現有的名稱文件。我得到這個錯誤信息(他試圖刪除目標目錄中的文件!):
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.core.NestedIOException: Failed to delete file /home/filedrop/done/OFFERS.xml.mini; nested exception is org.springframework.core.NestedIOException: Failed to remove file: 2: No such file at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:343)
at org.springframework.integration.file.remote.RemoteFileTemplate.rename(RemoteFileTemplate.java:290)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.doMv(AbstractRemoteFileOutboundGateway.java:482)
at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.handleRequestMessage(AbstractRemoteFileOutboundGateway.java:400)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 94 more
Caused by: org.springframework.core.NestedIOException: Failed to delete file /home/filedrop/done/OFFERS.xml.mini; nested exception is org.springframework.core.NestedIOException: Failed to remove file: 2: No such file
at org.springframework.integration.sftp.session.SftpSession.rename(SftpSession.java:211)
at org.springframework.integration.file.remote.RemoteFileTemplate$3.doInSessionWithoutResult(RemoteFileTemplate.java:300)
at org.springframework.integration.file.remote.SessionCallbackWithoutResult.doInSession(SessionCallbackWithoutResult.java:34)
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:334)
... 100 more
Caused by: org.springframework.core.NestedIOException: Failed to remove file: 2: No such file
at org.springframework.integration.sftp.session.SftpSession.remove(SftpSession.java:83)
at org.springframework.integration.sftp.session.SftpSession.rename(SftpSession.java:205)
... 103 more
感謝您的幫助!
編輯 工作流程,我則簡化了很多,但這裏myprevious問題的解決方案:
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(
Sftp.inboundAdapter(SftpSessionFactory())
.regexFilter(".*\\.xml$")
...
,
e -> e.id("sftpInboundAdapter")
.poller(Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES)
.maxMessagesPerPoll(-1)
)
)
.enrichHeaders(h -> h
// headers necessary for moving remote files (ftpOutboundGateway)
.headerExpression(FileHeaders.RENAME_TO, "'/home/blabla/done/' + payload.getName()")
.headerExpression(FileHeaders.REMOTE_FILE, "payload.getName()")
.header(FileHeaders.REMOTE_DIRECTORY,"/home/blabla/")
// headers necessary for moving local files (fileOutboundGateway_MoveToProcessedDirectory)
.headerExpression(FileHeaders.ORIGINAL_FILE, "payload.getAbsolutePath()")
.headerExpression(FileHeaders.FILENAME, "payload.getName()")
)
.transform(fileToJobLaunchRequestTransformer())
.handle(jobLaunchingGw(), e-> e.advice(retryAdvice()))
.<JobExecution, Boolean>route(p -> BatchStatus.COMPLETED.equals(p.getStatus()),
mapping -> mapping
.subFlowMapping("true", sf -> sf
.handle(org.springframework.batch.core.JobExecution.class,
(p, h) -> myServiceActivator.jobExecutionToString(p,
(String) h.get(FileHeaders.REMOTE_DIRECTORY),
(String) h.get(FileHeaders.REMOTE_FILE)))
.handle(ftpOutboundGateway())
.handle(Boolean.class,
(p, h) -> myServiceActivator.BooleanToString(p,
(String) h.get(FileHeaders.FILENAME)))
.handle(fileOutboundGateway_MoveToProcessedDirectory())
)
.subFlowMapping("false", sf -> sf
.channel("nullChannel")
)
)
.handle(logger())
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedRate(500).get();
}
@Bean
public MessageHandler ftpOutboundGateway() {
return Sftp
.outboundGateway(SftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.MV,
"payload")
.renameExpression("headers['file_renameTo']").get();
}
我有完成相同的任務。 你能更好地描述解決方案嗎(「回答你自己的問題」可能更好的「堆棧溢出方式」來做到這一點)。 Thnks! – lrkwz
myServiceActivator從哪裏來?你的pom中有哪些依賴關係?謝謝 – lrkwz
請不要使用此代碼做你想做的事,它不工作,我不再那樣做。回答你的問題myServiceActivator是一個帶有註解@ServiceActivator方法的組件,並且在我的流程中自動裝配。你需要spring-integration-sftp spring-integration-file spring-integration-java-dsl。也請看看https://github.com/spring-projects/spring-integration-java-dsl/wiki/Spring-Integration-Java-DSL-Reference。我可能會在後面寫這篇文章並更新這篇文章。同時祝你好運。 – landbit