Spring集成中的REST端点使消息传递通道多线程

问题描述:

我有一个非常简单的Spring Boot应用程序,它提供了几个宁静的端点,这应该是驱动一个sftp文件上传到sftp服务器。我的要求是,如果有多个文件,文件应该排队。我期望通过sftp spring集成工作流程的默认行为来实现这一点,因为我读到了DirectChannel自动排队文件。要测试行为,我执行以下操作:Spring集成中的REST端点使消息传递通道多线程

  1. 发送一个大文件,通过调用端点阻塞通道一段时间。
  2. 通过调用端点来发送较小的文件。

预期结果:较小的文件排队到一个通道上并在较大的文件上载完成后处理。 实际结果:打开了一个到sftp服务器的新连接,较小的文件上传到那里而不排队,而较大的文件继续传输。

有我的应用程序两个文件:

DemoApplication.java

@SpringBootApplication 
@IntegrationComponentScan 
@EnableAutoConfiguration(exclude={DataSourceAutoConfiguration.class}) 
public class DemoApplication { 

    public static void main(String[] args) { 
     SpringApplication.run(DemoApplication.class, args); 
    } 

    @Bean 
    public SessionFactory<LsEntry> sftpSessionFactory() { 
     DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true); 
     factory.setHost("localhost"); 
     factory.setPort(22); 
     factory.setUser("tester"); 
     factory.setPassword("password"); 
     factory.setAllowUnknownKeys(true); 
     return factory; 
    } 

    @Bean 
    @ServiceActivator(inputChannel = "toSftpChannel") 
    public MessageHandler handler() { 
     SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory()); 
     handler.setRemoteDirectoryExpression(new LiteralExpression("/")); 
     return handler; 
    } 

    @MessagingGateway 
    public interface MyGateway { 

     @Gateway(requestChannel = "toSftpChannel") 
     void sendToSftp(File file); 
    } 
} 

DemoController.java

@RestController 
public class DemoController { 

    @Autowired 
    MyGateway gateway; 

    @RequestMapping("/sendFile") 
    public void sendFile() { 
     File file = new File("C:/smallFile.txt"); 
     gateway.sendToSftp(file); 
    } 

    @RequestMapping("/sendBigFile") 
    public void sendBigFile() { 
     File file = new File("D:/bigFile.zip"); 
     gateway.sendToSftp(file); 
    } 
} 

我一个完整的新手去春来,我不知道完全是我的sftp频道在这里得到了正确的创建,我的猜测是每当我做一次sendToSftp调用时都会创建一个新频道。值得赞赏的是,在这种情况下如何实现队列行为的任何帮助。

您在这里没有队列,因为每个HTTP请求都在其自己的线程中执行。没错,你可能在http线程池已经耗尽时无论如何都排队了,但是这并没有在你的简单用例中看到只有两个请求。

无论如何,您可以实现队列行为,但您应该声明toSftpChannelQueueChannel bean。

这种方式下游过程将始终在同一个线程上执行,并且下一个消息在第一个消息之后恰好从队列中拉出。

查看Reference Manual了解更多信息。

UPDATE

由于您使用FtpMessageHandler这是单向的组成部分,但你仍然需要一些答复MVC控制器的方法,只有做到这一点的方法是有一个@Gateway方法与非void回报,当然我们需要以某种方式发送回复。

为此,我建议使用PublishSubscribeChannel

@Bean 
@BridgeTo 
public MessageChannel toSftpChannel() { 
    return new PublishSubscribeChannel(); 
} 

@Bean 
@ServiceActivator(inputChannel = "toSftpChannel") 
@Order(0) 
public MessageHandler handler() { 
    SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory()); 
    handler.setRemoteDirectoryExpression(new LiteralExpression("/")); 
    return handler; 
} 

这样,我们有两个用户的toSftpChannel。通过@Order(0)我们确保@ServiceActivator是第一个订户,因为我们需要首先执行SFTP传输。随着@BridgeTo我们增加第二BridgeHandler到相同PublishSubscribeChannel。它的目的只是为了获得一个replyChannel头,并在那里发送请求消息。由于我们不使用任何线程,因此在完成传输到SFTP之后,将执行BridgeHandler

当然代替BridgeHandler你可以有任何其他@ServiceActivator@Transfromer返回的答复不是请求File,但别的。例如:

@ServiceActivator(inputChannel = "toSftpChannel") 
@Order(1) 
public String transferComplete(File payload) { 
    return "The SFTP transfer complete for file: " + payload; 
} 
+0

嗨,所以我有一个建立在这个问题。我怎样才能让sendToSftp返回一些转移状态?我尝试将void更改为String或Object,将响应通道或错误通道添加到消息传递网关,但没有任何结果,有些帮助将不胜感激 – user2334207

+0

请参阅我的答案中的UPDATE。 –

+0

如何捕获异常?我试着联系一个没有权限的非现有服务器或文件夹,然后获得一个大量的堆栈跟踪,它似乎在顶部显示MessagingException。我在网关的sendToSftp上添加了MessagingException,并且在DemoController中包含了try catch,但是如果发生异常,sendToSftp根本不会返回 – user2334207