2015-07-01 115 views
3

我想一旦該批次已成功處理使用Spring的集成和Java DSL的文件到遠程SFTP移動文件移動處理遠程S(FTP)文件。使用Java DSL

什麼會實現,最好的方法是什麼?

  1. 添加批量一個腳印地向前遠程文件?
  2. 或者使用FTP出站網關,並提供mv命令?

我傾向於更喜歡第二種解決方案,並讓批處理專注於邏輯,但我很難嘗試用java dsl實現它。

我讀過http://docs.spring.io/spring-integration/reference/html/ftp.html#ftp-outbound-gateway,並試圖實現這樣的:

@Bean 
public MessageHandler ftpOutboundGateway() { 
    return Sftp.outboundGateway(SftpSessionFactory(), 
      AbstractRemoteFileOutboundGateway.Command.MV, "payload") 
      .localDirectory(new File("/home/blabla/")) 
      .get(); 

} 

@Bean 
public IntegrationFlow ftpInboundFlow() { 
    return IntegrationFlows 
      .from(
       Sftp.inboundAdapter(SftpSessionFactory()) 
       .regexFilter(".*\\.xml.mini$") 
       ...    
       , 
       e -> e.id("sftpInboundAdapter") 
       .poller(
         Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES) 
         .maxMessagesPerPoll(-1) 
         .advice(retryAdvice()) 
         ) 
      ) 
      .enrichHeaders(h -> h 
        .header(FileHeaders.REMOTE_DIRECTORY,"/home/filedrop/") 
        .header(FileHeaders.REMOTE_FILE, "/home/filedrop/OFFERS.xml.mini") 
        .header(FileHeaders.RENAME_TO, "/home/filedrop/done/OFFERS.xml.mini") 
      ) 
      .transform(fileToJobLaunchRequestTransformer())   
      .handle(jobLaunchingGw())) 
      .transform(jobExecutionToFileStringTransformer()) 
      .handle(ftpOutboundGateway()) 
      .handle(logger()) 
      .get(); 
} 

我知道我的標題應該是動態的,但我不知道該怎麼做,所以現在我使用現有的名稱文件。我得到這個錯誤信息(他試圖刪除目標目錄中的文件!):

Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is org.springframework.core.NestedIOException: Failed to delete file /home/filedrop/done/OFFERS.xml.mini; nested exception is org.springframework.core.NestedIOException: Failed to remove file: 2: No such file  at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:343) 
    at org.springframework.integration.file.remote.RemoteFileTemplate.rename(RemoteFileTemplate.java:290) 
    at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.doMv(AbstractRemoteFileOutboundGateway.java:482) 
    at org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway.handleRequestMessage(AbstractRemoteFileOutboundGateway.java:400) 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 
    ... 94 more 
Caused by: org.springframework.core.NestedIOException: Failed to delete file /home/filedrop/done/OFFERS.xml.mini; nested exception is org.springframework.core.NestedIOException: Failed to remove file: 2: No such file 
    at org.springframework.integration.sftp.session.SftpSession.rename(SftpSession.java:211) 
    at org.springframework.integration.file.remote.RemoteFileTemplate$3.doInSessionWithoutResult(RemoteFileTemplate.java:300) 
    at org.springframework.integration.file.remote.SessionCallbackWithoutResult.doInSession(SessionCallbackWithoutResult.java:34) 
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:334) 
    ... 100 more 
Caused by: org.springframework.core.NestedIOException: Failed to remove file: 2: No such file 
    at org.springframework.integration.sftp.session.SftpSession.remove(SftpSession.java:83) 
    at org.springframework.integration.sftp.session.SftpSession.rename(SftpSession.java:205) 
    ... 103 more 

感謝您的幫助!

編輯 工作流程,我則簡化了很多,但這裏myprevious問題的解決方案:

@Bean 
public IntegrationFlow ftpInboundFlow() { 
    return IntegrationFlows 
      .from(
       Sftp.inboundAdapter(SftpSessionFactory()) 
       .regexFilter(".*\\.xml$") 
       ... 
       , 
       e -> e.id("sftpInboundAdapter") 
       .poller(Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES) 
         .maxMessagesPerPoll(-1) 
         ) 
      ) 
      .enrichHeaders(h -> h 
        // headers necessary for moving remote files (ftpOutboundGateway) 
        .headerExpression(FileHeaders.RENAME_TO, "'/home/blabla/done/' + payload.getName()") 
        .headerExpression(FileHeaders.REMOTE_FILE, "payload.getName()") 
        .header(FileHeaders.REMOTE_DIRECTORY,"/home/blabla/") 
        // headers necessary for moving local files (fileOutboundGateway_MoveToProcessedDirectory) 
        .headerExpression(FileHeaders.ORIGINAL_FILE, "payload.getAbsolutePath()") 
        .headerExpression(FileHeaders.FILENAME, "payload.getName()") 
      ) 
      .transform(fileToJobLaunchRequestTransformer())   
      .handle(jobLaunchingGw(), e-> e.advice(retryAdvice())) 

      .<JobExecution, Boolean>route(p -> BatchStatus.COMPLETED.equals(p.getStatus()), 
              mapping -> mapping 
              .subFlowMapping("true", sf -> sf 


               .handle(org.springframework.batch.core.JobExecution.class, 
                 (p, h) -> myServiceActivator.jobExecutionToString(p, 
                   (String) h.get(FileHeaders.REMOTE_DIRECTORY), 
                   (String) h.get(FileHeaders.REMOTE_FILE))) 
               .handle(ftpOutboundGateway()) 
               .handle(Boolean.class, 
                 (p, h) -> myServiceActivator.BooleanToString(p, 
                   (String) h.get(FileHeaders.FILENAME))) 
               .handle(fileOutboundGateway_MoveToProcessedDirectory()) 

                        ) 


             .subFlowMapping("false", sf -> sf 
              .channel("nullChannel")  

              ) 
      ) 

      .handle(logger()) 
      .get(); 
} 

@Bean(name = PollerMetadata.DEFAULT_POLLER) 
public PollerMetadata poller() { 
    return Pollers.fixedRate(500).get(); 
} 


@Bean 
public MessageHandler ftpOutboundGateway() { 
    return Sftp 
      .outboundGateway(SftpSessionFactory(), 
        AbstractRemoteFileOutboundGateway.Command.MV, 
        "payload") 
      .renameExpression("headers['file_renameTo']").get(); 
} 
+0

我有完成相同的任務。 你能更好地描述解決方案嗎(「回答你自己的問題」可能更好的「堆棧溢出方式」來做到這一點)。 Thnks! – lrkwz

+0

myServiceActivator從哪裏來?你的pom中有哪些依賴關係?謝謝 – lrkwz

+0

請不要使用此代碼做你想做的事,它不工作,我不再那樣做。回答你的問題myServiceActivator是一個帶有註解@ServiceActivator方法的組件,並且在我的流程中自動裝配。你需要spring-integration-sftp spring-integration-file spring-integration-java-dsl。也請看看https://github.com/spring-projects/spring-integration-java-dsl/wiki/Spring-Integration-Java-DSL-Reference。我可能會在後面寫這篇文章並更新這篇文章。同時祝你好運。 – landbit

回答

1

也許你沒有權限做重命名或重命名失敗出於某種其他原因;此例外是嘗試刪除「到」文件名,因爲初始重命名失敗。打開調試日誌,你應該看到這篇日誌...

if (logger.isDebugEnabled()){ 
    logger.debug("Initial File rename failed, possibly because file already exists. Will attempt to delete file: " 
      + pathTo + " and execute rename again."); 
} 
try { 
    this.remove(pathTo); 

由於故障在此remove()操作,你的失敗表明,重命名失敗,因爲一些其他原因(因爲明確了「到」文件不不存在)。

+0

謝謝,事情是我不知道該怎麼怪錯路徑?權限?流量配置?然後,我實現它在XML文檔地方和樣品都遠遠比Java DSL更好(例子是有時過於簡單的海事組織新來者,也還是用'糊塗@ Transformer','.transform()','手柄()'和' @ ServiceActivator「轉換消息的最佳實踐是什麼),不知何故,它讓我明白了一些事情,並且切換到java dsl更容易。我更新了我的帖子,希望這可以幫助未來的人。無論如何,感謝讓開發人員的生活更輕鬆:) – landbit

+0

XML已經有很長的時間了;我們正在努力在時間允許的情況下改進Java Config/DSL文檔。如果您對文檔或示例改進有任何建議,請創建一個JIRA [此處](https://jira.spring.io/browse/INT)或[here](https://jira.spring.io/browse/INTSAMPLES ) 分別。如果你想[貢獻](https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.md)會更好。 –

+0

Dsl似乎比xml配置好得多,它簡化了它...但它缺乏來自新手觀點的文檔。這樣的真實世界的例子會有很大的幫助(諸如「從ftp讀取文件,逐行處理文件,發送確認郵件,移動遠程ftp文件」) – lrkwz