2

我有一個非常簡單的Spring Boot應用程序,它提供了幾個寧靜的端點,這應該是驅動一個sftp文件上傳到sftp服務器。我的要求是,如果有多個文件,文件應該排隊。我期望通過sftp spring集成工作流程的默認行爲來實現這一點,因爲我讀到了DirectChannel自動排隊文件。要測試行爲,我執行以下操作:Spring集成中的REST端點使消息傳遞通道多線程

  1. 發送一個大文件,通過調用端點阻塞通道一段時間。
  2. 通過調用端點來發送較小的文件。

預期結果:較小的文件排隊到一個通道上並在較大的文件上載完成後處理。 實際結果:打開了一個到sftp服務器的新連接,較小的文件上傳到那裏而不排隊,而較大的文件繼續傳輸。

有我的應用程序兩個文件:

DemoApplication.java

@SpringBootApplication 
@IntegrationComponentScan 
@EnableAutoConfiguration(exclude={DataSourceAutoConfiguration.class}) 
public class DemoApplication { 

    public static void main(String[] args) { 
     SpringApplication.run(DemoApplication.class, args); 
    } 

    @Bean 
    public SessionFactory<LsEntry> sftpSessionFactory() { 
     DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true); 
     factory.setHost("localhost"); 
     factory.setPort(22); 
     factory.setUser("tester"); 
     factory.setPassword("password"); 
     factory.setAllowUnknownKeys(true); 
     return factory; 
    } 

    @Bean 
    @ServiceActivator(inputChannel = "toSftpChannel") 
    public MessageHandler handler() { 
     SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory()); 
     handler.setRemoteDirectoryExpression(new LiteralExpression("/")); 
     return handler; 
    } 

    @MessagingGateway 
    public interface MyGateway { 

     @Gateway(requestChannel = "toSftpChannel") 
     void sendToSftp(File file); 
    } 
} 

DemoController.java

@RestController 
public class DemoController { 

    @Autowired 
    MyGateway gateway; 

    @RequestMapping("/sendFile") 
    public void sendFile() { 
     File file = new File("C:/smallFile.txt"); 
     gateway.sendToSftp(file); 
    } 

    @RequestMapping("/sendBigFile") 
    public void sendBigFile() { 
     File file = new File("D:/bigFile.zip"); 
     gateway.sendToSftp(file); 
    } 
} 

我一個完整的新手去春來,我不知道完全是我的sftp頻道在這裏得到了正確的創建,我的猜測是每當我做一次sendToSftp調用時都會創建一個新頻道。值得讚賞的是,在這種情況下如何實現隊列行爲的任何幫助。

回答

2

您在這裏沒有隊列,因爲每個HTTP請求都在其自己的線程中執行。沒錯,你可能在http線程池已經耗盡時無論如何都排隊了,但是這並沒有在你的簡單用例中看到只有兩個請求。

無論如何,您可以實現隊列行爲,但您應該聲明toSftpChannelQueueChannel bean。

這種方式下游過程將始終在同一個線程上執行,並且下一個消息在第一個消息之後恰好從隊列中拉出。

查看Reference Manual瞭解更多信息。

UPDATE

由於您使用FtpMessageHandler這是單向的組成部分,但你仍然需要一些答覆MVC控制器的方法,只有做到這一點的方法是有一個@Gateway方法與非void回報,當然我們需要以某種方式發送回覆。

爲此,我建議使用PublishSubscribeChannel

@Bean 
@BridgeTo 
public MessageChannel toSftpChannel() { 
    return new PublishSubscribeChannel(); 
} 

@Bean 
@ServiceActivator(inputChannel = "toSftpChannel") 
@Order(0) 
public MessageHandler handler() { 
    SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory()); 
    handler.setRemoteDirectoryExpression(new LiteralExpression("/")); 
    return handler; 
} 

這樣,我們有兩個用戶的toSftpChannel。通過@Order(0)我們確保@ServiceActivator是第一個訂戶,因爲我們需要首先執行SFTP傳輸。隨着@BridgeTo我們增加第二BridgeHandler到相同PublishSubscribeChannel。它的目的只是爲了獲得一個replyChannel頭,並在那裏發送請求消息。由於我們不使用任何線程,因此在完成傳輸到SFTP之後,將執行BridgeHandler

當然代替BridgeHandler你可以有任何其他@ServiceActivator@Transfromer返回的答覆不是請求File,但別的。例如:

@ServiceActivator(inputChannel = "toSftpChannel") 
@Order(1) 
public String transferComplete(File payload) { 
    return "The SFTP transfer complete for file: " + payload; 
} 
+0

嗨,所以我有一個建立在這個問題。我怎樣才能讓sendToSftp返回一些轉移狀態?我嘗試將void更改爲String或Object,將響應通道或錯誤通道添加到消息傳遞網關,但沒有任何結果,有些幫助將不勝感激 – user2334207

+0

請參閱我的答案中的UPDATE。 –

+0

如何捕獲異常?我試着聯繫一個沒有權限的非現有服務器或文件夾,然後獲得一個大量的堆棧跟蹤,它似乎在頂部顯示MessagingException。我在網關的sendToSftp上添加了MessagingException,並且在DemoController中包含了try catch,但是如果發生異常,sendToSftp根本不會返回 – user2334207