Spring Boot实战(九)9.4 系统集成Spring Integration

9.4.1 Spring Integration快速入门

Spring Integration提供了基于Spring的EIP(Enterprise Integration Patterns,企业集成模式的实现。Spring Integration主要解决的问题是不同系统之间交互的问题,通过异步消息驱动来达到系统交互时系统之间的松耦合。本节将基于无XML配置的原则使用Java配置、注解以及Spring Integration Java DSL来使用Spring Integration。

9.4.2 Message

Message是用来在不同部分之间传递的数据。Message由两部分组成:消息体(payload)与消息头(header)。消息体可以是任何数据类型(如XML、JSON,Java对象);消息头表示的元数据就是解释消息体的内容。

public interface Message<T> {
	T getPayload();
	MessageHeaders getHeaders();
}

9.4.3 Channel

在消息系统中,消息发送者发送消息到通道(Channel),消息收受者从通道(Channel)接收消息。

1.*接口

(1)MessageChannel

MessageChannel是Spring Integration消息通道的*接口:

public interface MessageChannel{
	public static final long INDEFINITE_TIMEOUT = -1;
	boolean send(Message<?>message);
	boolean send(Message<?>message,long timeout);
}

当使用send方法发送消息时,返回值为true,则表示消息发送成功。MessageChannel有两大子接口,分别为PollableChannel(可轮询)和SubscribableChannel(可订阅)。我们所有的消息通道类都是实现这两个接口。

(2)PollableChannel

PollableChannel具备轮询获得消息的能力,定义如下:

public interface PollableChannel extends MessageChannel {
	Message<?> receive();
	Message<?> receive(long timeout);
}
(3)SubscribableChannel

SubscribableChannel发送消息给订阅了MessageHanlder的订阅者:

public interface SubscribableChannel extends MessageChannel{
	boolean subscibe(MessageHandler handler);
	boolean unsubscribe(MessageHandler handler);
}

2.常用消息通道

(1)PublishSubScribeChannel

PublishSubscribeChannel允许广播消息给所有订阅者,配置方式如下:

@Bean
public PublishSubscribeChannel publishSubscribeChannel() {
	PublishSubscribeChannel channel = new PublishSubscribeChannel();
	return channel;
}

其中,当前消息通道的id为publishSubscribeChannel。
(2)QueueChannel
QueueChannel允许消息接收者轮询获得信息,用一个队列(queue)接收消息,队列的容量大小可配置,配置方式如下:

@Bean
public QueueChannel queueChannel(){
	QueueChannel channel = new QueueChannel(10);
	return channel
}

其中QueueChannel构造参数10即为队列的容量。

(3)PriorityChannel

PriorityCHannel可按照优先级将数据存储到对,它依据于消息的消息头priority属性,配置方式如下:

@Bean
public PriorityChannel priorityChannel(){
	PriorityChannel channel = new PriorityChannel(10);
	return channel;
}
(4)RendezvousChannel

RendezvousChannel确保每一个接收者都接收到消息后再发送消息,配置方式如下:

@Bean
public RendezvousChannel rendezvousChannel(){
	RendezvousChannel channel = new RendezvousChannel();
	return channel;
}
(5)DirectChannel

DirectChannel是Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收,配置方式如下

@Bean
public DirectChannel directChannel(){
	DirectChannel channel = new DirectChannel();
	return channel;
}
(6)ExecutorChannel

ExecutorChannel可绑定一个多线程的task executor,配置方式如下:

@Bean
public ExecutorChannel executorChannel(){
	ExecutorChannel channel = new ExecutorChannel(executor());
	return channel;
}

@Bean
public Executor executor(){
	ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
	taskExecutor.setCorePoolSize(5);
	taskExecutor.setMaxPoolSize(10);
	taskExecutor.setQueueCapacity(25);
	taskExecutor.initialize();
	return taskExecutor;
}

3.通道拦截器

Spring Integration给消息通道提供了通道拦截器(ChannelInterceptor),用来拦截发送和接收消息的操作。
ChannelInterceptor接口定义如下,我们只需实现这个接口即可:

public interface ChannelInterceptor {
	Message<?>preSend(Message<?>message,MessageChannel channel);
	void postSend(Message<?>message,MessageChannel channel,boolean sent);
	void afterSendCompletion(Message<?>message,MessageChannel channel,boolean sent,Exception ex);
	boolean preReceive(MessageChannel channel);
	Message<?>postReceive(Message<?>messaeg,MessageChannel channel);
	void afterReceiveCompletion(Message<?>message,MessageChannel channel,Exception ex);
}

我们通过下面的代码给所有的channel增加拦截器:

channel.addInterceptor(someInterceptor);

9.4.4 Message EndPoint

消息端点(Message Endpoint)是真正处理消息的(Message)组件,它还可以控制通道的路由。我们可用的消息端点包含如下:

(1)Channel Adapter

通道适配器(Channel Adapter)是一种连接外部系统或传输协议的端点(EndPoint),可以分入站(inbound)和出站(outbound)。
通道适配器是单向的,入站通道适配器只支持接收消息,出站通道适配器只支持输出消息。
Spring Integration内置如下的适配器:
RabbitMQ、Feed、File、FTPd/SFTP、Gemfire、HTTP、TCP/UDP、JDBC、JPA、JMS、Mail、MongoDB、Redis、RMI、Twitter、XMPP、WebServices(SOAP、RESTaurant)、WebSocket等。

(2)Gateway

消息网关(Gateway)类似于Adapter,但是提供了双向的请求/返回集成方式,也分为入站(inbound)和出站(outbound)。Spring Integration对相应的Adapter多都提供了Gatew。

(3)Service Activator

Service Activator可调用Spring的Bean来处理消息,并将处理后的结果输出到指定的消息通道。

(4)Router

路由(Router)可根据消息体类型(Payload Type Router)、消息头的值(Header Value Router)以及定义好的接收表(Recipient List Router)作为条件,来决定消息传递到的通道。

(5)Filter

过滤器(Filter)类似于路由(Router),不同的是过滤器不决定消息路由到哪里,而是决定消息是否可以传递给消息通道。

(6)Splitter

拆分器(Splitter)将消息拆分为几个部分单独处理,拆分器处理的返回值是一个集合或者数组。

(7)Aggregator

聚合器(Aggregator)与拆分器相反,它接收一个java.util.List作为参数,将多个消息合并为一个消息。

(8)Enricher

当我们从外部获得消息后,需要增加额外的消息到已有的消息中,这时就需要使用消息增强器(Enricher)。消息增强器主要有消息体增强器(Payload Enricher)和消息头增强器(Header Enricher)两种。

(9)Transformer

转换器(Transformer)是地获得的消息进行一定的逻辑转换处理(如数据格式转换)。

(10)Bridge

使用连接桥(Bridge)可以简单地将两个消息通道连接起来。

9.4.5 Spring Integration Java DSL

Spring Integration提供了一个IntegrationFlow来定义系统继承流程,而通过IntegrationFlows和IntegrationFlowBuilder来实现使用Fluent API来定义流程。在Fulent API里,分别提供了下面方法来映射Spring Integration的端点(EndPoint)。

transform() -> Transformer
filter() -> Filter
handle() -> ServiceActivator、Adapter、Gateway
split() -> Splitter
aggregate() -> Aggregator
route() -> Router
bridge() -> Bridge

一个简单的流程定义如下:

@Bean
public IntegrationFlow demoFlow(){
 return IntegrationFlows.from("input")  //从Channel  input获取消息
 	.<String,Integer>transform(Integer::parseint) //将消息转换成整数
 	.get();  //获得集成流程并注册为Bean
}

9.4.6 实战

本章将演示读取https://spring.io/blog.atom的新闻聚合文件,atom是一种xml文件,且格式是固定的,示例如下:

<?xml version="1.0" encoding="UTF-8"?>
<feed xmlns="http://www.w3.org/2005/Atom">
	<title>Spring</title>
	<link rel="alternate" href="https://spring.io/blog" />
	<link rel="self" href="https://spring.io/blog.atom" />
	<id>http://spring.io/blog.atom</id>
	<icon>https://spring.io/favicon.ico</icon>
	<upadated>2015-07-29T14:46:00z</upadated>
	<entry>
		<title>Spring Cloud Connectors 1.2.0 released</title>
		<link rel="alternate" href="http://..." />
		<category term="releases" label="Releases" />
		<author>
			<name>some author</name>
		</author>
		<id>tag:spring.io,2015-07-27:2196</id>
		<updated>2015-07-29T14:46:00z</updated>
		<content type="html">...</content>
	</entry>
</feed>

我们将读取到消息通过分类(Category),将消息转到不同的消息通道,将分类为releases和engineering的消息写入磁盘文件,将分类为news的消息通过邮件发送。

1.新建Spring Boot项目

新建Spring Boot项目,依赖为Integration(spring-boot-starter-integration)和mail(spring-boot-starter-mail)。
项目信息

groupId:com.wisely
arctifactId:ch9_4
package:com.wisely.ch9_4

另外,我们还要添加Spring Integration对atom及mail的支持。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.20</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.wisely</groupId>
	<artifactId>ch9_4</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>ch9_3_5-1</name>
	<description>Demo project for Spring Boot</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-mail</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-feed</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-mail</artifactId>
		</dependency>
		
		<dependency>
		    <groupId>org.springframework.integration</groupId>
		    <artifactId>spring-integration-java-dsl</artifactId>
		    <version>1.2.3.RELEASE</version>
		</dependency>
		
		<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-file -->
		<dependency>
		    <groupId>org.springframework.integration</groupId>
		    <artifactId>spring-integration-file</artifactId>
		</dependency>
				
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>


本例所有的代码都在入口类中完成。

2.读取流程

package com.wisely.ch9_4;

import java.io.IOException;


import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.feed.inbound.FeedEntryMessageSource;
import org.springframework.integration.scheduling.PollerMetadata;

import com.rometools.rome.feed.synd.SyndEntry;

@SpringBootApplication
public class Ch9351Application {
	
	@Value("https://spring.io/blog.atom")  //通过@Value注解自动获得https://spring.id/blog.atom的资源
	Resource resource;
	
	@Bean(name = PollerMetadata.DEFAULT_POLLER)
	public PollerMetadata poller() {  //使用Fluent API和Pollers配置默认的轮询方式
		return Pollers.fixedRate(500).get();
	}
	
	@Bean
	public FeedEntryMessageSource feedEntryMessageSource()throws IOException{ //FeedEntryMessageSource实际为feed:inbound-channel-adapter,此处即构造feed的入站通道适配器作为数据输入
		FeedEntryMessageSource messageSource = new FeedEntryMessageSource(resource.getURL(), "news");
		return messageSource;
	}
	
	@Bean
	public IntegrationFlow myFlow() throws IOException {
		return IntegrationFlows.from(feedEntryMessageSource())  //流程从from方法开始
				.<SyndEntry,String>route(payload ->
				payload.getCategories().get(0).getName(),  //通过路由方法route来选择路由,消息体(payload)的类型为SyndEntry,作为判断条件的类型为String,判断的值是通过payload获得的分类(Categroy);
				mapping -> mapping.channelMapping("releases",
						"releasesChannel")  //通过不同分类的值转向不同的消息通道,若分类为releases,则转向releasesChannel;
				.channelMapping("engineering",
						"engineeringChannel")
				.channelMapping("news","newsChannel"))
				.get();   //通过get方法获得IntegrationFlow实体,配置为Spring的Bean。
	}

	public static void main(String[] args) {
		SpringApplication.run(Ch9351Application.class, args);
	}

}

3.releases流程

	@Bean
	public IntegrationFlow releasesFlow() {
		return IntegrationFlows.from(MessageChannels.queue("releasesChannel", 10)) //从消息通道releasesChannel开始获取数据。
				.<SyndEntry, String> transform(
						payload -> "《" + payload.getTitle() + "》 " + payload.getLink() + getProperty("line.separator")) //使用transform方法进行数据转换。payload类型为SyndEntry,将其转换为字符串类型,并自定义数据的格式。
				.handle(Files.outboundAdapter(new File("e:/springblog")) //用handle方法处理file的出站适配器。Files类是由Spring Integration Java DSL提供的 Fluent API用来构造文件输出的适配器。
						.fileExistsMode(FileExistsMode.APPEND) 
						.charset("UTF-8") 
						.fileNameGenerator(message -> "releases.txt") 
						.get())
				.get();
	}

4.engineering流程

	@Bean
	public IntegrationFlow engineeringFlow() {
		return IntegrationFlows.from(MessageChannels.queue("engineeringChannel", 10))
				.<SyndEntry, String> transform(
						payload -> "《" + payload.getTitle() + "》" + payload.getLink() + getProperty("line.separator"))
				.handle(Files.outboundAdapter(new File("e:/springblog"))
						.fileExistsMode(FileExistsMode.APPEND)
						.charset("UTF-8")
						.fileNameGenerator(message -> "engineering.txt")
						.get())
				.get();
	}
	}

5.news流程

	@Bean
	public IntegrationFlow newsFlow() {
		return
				IntegrationFlows.from(MessageChannels.queue("newsChannel",10))
				.<SyndEntry,String>transform(
						payload -> "《" + payload.getTitle() + "》" +
				payload.getLink() + getProperty("line.separator"))
				.enrichHeaders(                   //通过enricherHeader来增加消息头的信息
						Mail.headers()
						.subject("来自Spring的新闻")
						.to("[email protected]")
						.from("[email protected]"))
				.handle(Mail.outboundAdapter("smtp.qq.com")  //邮件发送的相关信息通过Spring Integration JavaDSL提供的Mail的headers方法来构造。
						.port(25)
						.protocol("smtp")
						.credentials("[email protected]", "******") //使用handle方法来定义邮件发送的出站适配器,使用Spring Integration Java DSL提供的Mail。来构造,这里使用[email protected]给自己发送邮件
						.javaMailProperties(p -> p.put("mail.debug", false)),
						e -> e.id("smtpOut"))
				.get();
				
	}

6.运行

(1)写文件结果

查看E:\springblog目录,发现多了两个文件,如图
Spring Boot实战(九)9.4 系统集成Spring Integration
engineering.txt文件内容如图
Spring Boot实战(九)9.4 系统集成Spring Integration
releases.txt文件内容如图
Spring Boot实战(九)9.4 系统集成Spring Integration
(2)邮箱接收结果
登录邮箱可以看取刚才发送的邮件