Spring集成中的REST端点使消息传递通道多线程
我有一个非常简单的Spring Boot应用程序,它提供了几个宁静的端点,这应该是驱动一个sftp文件上传到sftp服务器。我的要求是,如果有多个文件,文件应该排队。我期望通过sftp spring集成工作流程的默认行为来实现这一点,因为我读到了DirectChannel自动排队文件。要测试行为,我执行以下操作:Spring集成中的REST端点使消息传递通道多线程
- 发送一个大文件,通过调用端点阻塞通道一段时间。
- 通过调用端点来发送较小的文件。
预期结果:较小的文件排队到一个通道上并在较大的文件上载完成后处理。 实际结果:打开了一个到sftp服务器的新连接,较小的文件上传到那里而不排队,而较大的文件继续传输。
有我的应用程序两个文件:
DemoApplication.java
@SpringBootApplication
@IntegrationComponentScan
@EnableAutoConfiguration(exclude={DataSourceAutoConfiguration.class})
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(22);
factory.setUser("tester");
factory.setPassword("password");
factory.setAllowUnknownKeys(true);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "toSftpChannel")
public MessageHandler handler() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression("/"));
return handler;
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toSftpChannel")
void sendToSftp(File file);
}
}
DemoController.java
@RestController
public class DemoController {
@Autowired
MyGateway gateway;
@RequestMapping("/sendFile")
public void sendFile() {
File file = new File("C:/smallFile.txt");
gateway.sendToSftp(file);
}
@RequestMapping("/sendBigFile")
public void sendBigFile() {
File file = new File("D:/bigFile.zip");
gateway.sendToSftp(file);
}
}
我一个完整的新手去春来,我不知道完全是我的sftp频道在这里得到了正确的创建,我的猜测是每当我做一次sendToSftp调用时都会创建一个新频道。值得赞赏的是,在这种情况下如何实现队列行为的任何帮助。
您在这里没有队列,因为每个HTTP请求都在其自己的线程中执行。没错,你可能在http线程池已经耗尽时无论如何都排队了,但是这并没有在你的简单用例中看到只有两个请求。
无论如何,您可以实现队列行为,但您应该声明toSftpChannel
为QueueChannel
bean。
这种方式下游过程将始终在同一个线程上执行,并且下一个消息在第一个消息之后恰好从队列中拉出。
查看Reference Manual了解更多信息。
UPDATE
由于您使用FtpMessageHandler
这是单向的组成部分,但你仍然需要一些答复MVC控制器的方法,只有做到这一点的方法是有一个@Gateway
方法与非void
回报,当然我们需要以某种方式发送回复。
为此,我建议使用PublishSubscribeChannel
:
@Bean
@BridgeTo
public MessageChannel toSftpChannel() {
return new PublishSubscribeChannel();
}
@Bean
@ServiceActivator(inputChannel = "toSftpChannel")
@Order(0)
public MessageHandler handler() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression("/"));
return handler;
}
这样,我们有两个用户的toSftpChannel
。通过@Order(0)
我们确保@ServiceActivator
是第一个订户,因为我们需要首先执行SFTP传输。随着@BridgeTo
我们增加第二BridgeHandler
到相同PublishSubscribeChannel
。它的目的只是为了获得一个replyChannel
头,并在那里发送请求消息。由于我们不使用任何线程,因此在完成传输到SFTP之后,将执行BridgeHandler
。
当然代替BridgeHandler
你可以有任何其他@ServiceActivator
或@Transfromer
返回的答复不是请求File
,但别的。例如:
@ServiceActivator(inputChannel = "toSftpChannel")
@Order(1)
public String transferComplete(File payload) {
return "The SFTP transfer complete for file: " + payload;
}
嗨,所以我有一个建立在这个问题。我怎样才能让sendToSftp返回一些转移状态?我尝试将void更改为String或Object,将响应通道或错误通道添加到消息传递网关,但没有任何结果,有些帮助将不胜感激 – user2334207
请参阅我的答案中的UPDATE。 –
如何捕获异常?我试着联系一个没有权限的非现有服务器或文件夹,然后获得一个大量的堆栈跟踪,它似乎在顶部显示MessagingException。我在网关的sendToSftp上添加了MessagingException,并且在DemoController中包含了try catch,但是如果发生异常,sendToSftp根本不会返回 – user2334207