使用Spring Integration进行消息处理
Spring Integration提供了Spring框架的扩展,以支持著名的企业集成模式。 它在基于Spring的应用程序中启用轻量级消息传递,并支持与外部系统的集成。 Spring Integration的最重要目标之一是为构建可维护且可测试的企业集成解决方案提供一个简单的模型。
主要成分
消息:它是任何Java对象的通用包装,这些对象与框架在处理该对象时使用的元数据结合在一起。 它由有效负载和标头组成。 消息有效负载可以是任何Java对象,消息头是字符串/对象映射,覆盖头名称和值。 MessageBuilder用于创建覆盖有效载荷和标头的消息,如下所示:
import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; Message message = MessageBuilder.withPayload("Message Payload") .setHeader("Message_Header1", "Message_Header1_Value") .setHeader("Message_Header2", "Message_Header2_Value") .build();
消息通道:消息通道是通过其移动消息的组件,因此可以将其视为消息生产者和使用者之间的管道。 生产者将消息发送到渠道,而消费者从渠道接收消息。 消息通道可以遵循点对点或发布/订阅语义。 使用点对点通道,最多一个消费者可以接收发送到该通道的每条消息。 使用发布/订阅通道,多个订阅者可以接收发送到该通道的每个消息。 Spring Integration支持这两种方式。
在此示例项目中,使用直接通道和空通道。 直接通道是Spring Integration中的默认通道类型,也是最简单的点对点通道选项。 空通道是一个虚拟消息通道,主要用于测试和调试。 它不会将消息从发送方发送到接收方,但其send方法始终返回true,而receive方法返回null值。 除了DirectChannel和NullChannel,Spring Integration还提供了不同的消息通道实现,例如PublishSubscribeChannel,QueueChannel,PriorityChannel,RendezvousChannel,ExecutorChannel和ScopedChannel。
消息端点:消息端点将应用程序代码与基础结构隔离。 换句话说,它是应用程序代码和消息传递框架之间的抽象层。
主要消息端点
转换程序:消息转换程序负责转换消息的内容或结构并返回修改后的消息。 例如:它可用于将消息有效负载从一种格式转换为另一种格式或修改消息头值。
过滤器:消息过滤器确定是否将消息传递到消息通道。
路由器:消息路由器决定哪个信道(如果可用)接下来应接收消息。
拆分器:拆分器将传入的消息分解为多个消息,并将其发送到适当的通道。
聚合器:聚合器将多个消息组合为单个消息。
服务**器:服务**器是用于将服务实例连接到消息传递系统的通用端点。
通道适配器:通道适配器是将消息通道连接到外部系统的端点。 通道适配器可以是入站的或出站的。 入站通道适配器端点将外部系统连接到MessageChannel。 出站通道适配器端点将MessageChannel连接到外部系统。
消息传递网关:网关是消息传递系统的入口,它对外部系统隐藏消息传递API。 通过覆盖请求和回复通道,它是双向的。
Spring Integration还提供了各种通道适配器和消息传递网关(用于AMQP,文件,Redis,Gemfire,Http,Jdbc,JPA,JMS,RMI,Stream等),以支持与外部系统的基于消息的通信。 请访问Spring Integration Reference文档以获取详细信息。
以下示例货物消息传递实现显示了易于理解的基本消息端点的行为。 货运信息系统通过使用CargoGateway接口监听来自外部系统的货运信息。 通过使用CargoSplitter,CargoFilter,CargoRouter,CargoTransformer MessageEndpoints处理收到的货物消息。 之后,已处理的成功的国内和国际货运消息将发送到CargoServiceActivator。
货物信息系统的Spring整合流程如下:
让我们看一下示例货物消息传递实现。
二手技术
- JDK 1.8.0_25
- 春天4.1.2
- Spring Integration 4.1.0
- Maven 3.2.2
- Ubuntu 14.04
项目层次结构如下:
步骤1:依存关系
依赖关系已添加到Maven pom.xml。
<properties> <spring.version>4.1.2.RELEASE</spring.version> <spring.integration.version>4.1.0.RELEASE</spring.integration.version> </properties> <dependencies> <!-- Spring 4 dependencies --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <!-- Spring Integration dependencies --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>${spring.integration.version}</version> </dependency> </dependencies>
第2步:货柜机
创建CargoBuilder来构建货运请求。
public class Cargo { public enum ShippingType { DOMESTIC, INTERNATIONAL } private final long trackingId; private final String receiverName; private final String deliveryAddress; private final double weight; private final String description; private final ShippingType shippingType; private final int deliveryDayCommitment; private final int region; private Cargo(CargoBuilder cargoBuilder) { this.trackingId = cargoBuilder.trackingId; this.receiverName = cargoBuilder.receiverName; this.deliveryAddress = cargoBuilder.deliveryAddress; this.weight = cargoBuilder.weight; this.description = cargoBuilder.description; this.shippingType = cargoBuilder.shippingType; this.deliveryDayCommitment = cargoBuilder.deliveryDayCommitment; this.region = cargoBuilder.region; } // Getter methods... @Override public String toString() { return "Cargo [trackingId=" + trackingId + ", receiverName=" + receiverName + ", deliveryAddress=" + deliveryAddress + ", weight=" + weight + ", description=" + description + ", shippingType=" + shippingType + ", deliveryDayCommitment=" + deliveryDayCommitment + ", region=" + region + "]"; } public static class CargoBuilder { private final long trackingId; private final String receiverName; private final String deliveryAddress; private final double weight; private final ShippingType shippingType; private int deliveryDayCommitment; private int region; private String description; public CargoBuilder(long trackingId, String receiverName, String deliveryAddress, double weight, ShippingType shippingType) { this.trackingId = trackingId; this.receiverName = receiverName; this.deliveryAddress = deliveryAddress; this.weight = weight; this.shippingType = shippingType; } public CargoBuilder setDeliveryDayCommitment(int deliveryDayCommitment) { this.deliveryDayCommitment = deliveryDayCommitment; return this; } public CargoBuilder setDescription(String description) { this.description = description; return this; } public CargoBuilder setRegion(int region) { this.region = region; return this; } public Cargo build() { Cargo cargo = new Cargo(this); if ((ShippingType.DOMESTIC == cargo.getShippingType()) && (cargo.getRegion() <= 0 || cargo.getRegion() > 4)) { throw new IllegalStateException("Region is invalid! Cargo Tracking Id : " + cargo.getTrackingId()); } return cargo; } }
步骤3:货运讯息
CargoMessage是“国内和国际货运消息”的父类。
public class CargoMessage { private final Cargo cargo; public CargoMessage(Cargo cargo) { this.cargo = cargo; } public Cargo getCargo() { return cargo; } @Override public String toString() { return cargo.toString(); } }
步骤4:国内货运讯息
DomesticCargoMessage类模拟国内货物消息。
public class DomesticCargoMessage extends CargoMessage { public enum Region { NORTH(1), SOUTH(2), EAST(3), WEST(4); private int value; private Region(int value) { this.value = value; } public static Region fromValue(int value) { return Arrays.stream(Region.values()) .filter(region -> region.value == value) .findFirst() .get(); } } private final Region region; public DomesticCargoMessage(Cargo cargo, Region region) { super(cargo); this.region = region; } public Region getRegion() { return region; } @Override public String toString() { return "DomesticCargoMessage [cargo=" + super.toString() + ", region=" + region + "]"; } }
步骤5:国际货运讯息
InternationalCargoMessage类模拟国际货运消息。
public class InternationalCargoMessage extends CargoMessage { public enum DeliveryOption { NEXT_FLIGHT, PRIORITY, ECONOMY, STANDART } private final DeliveryOption deliveryOption; public InternationalCargoMessage(Cargo cargo, DeliveryOption deliveryOption) { super(cargo); this.deliveryOption = deliveryOption; } public DeliveryOption getDeliveryOption() { return deliveryOption; } @Override public String toString() { return "InternationalCargoMessage [cargo=" + super.toString() + ", deliveryOption=" + deliveryOption + "]"; } }
步骤6:应用程序配置
AppConfiguration是Spring容器的配置提供程序类。 它创建消息通道并注册到Spring BeanFactory。 此外, @EnableIntegration启用导入的Spring集成配置,而@IntegrationComponentScan扫描特定于Spring Integration的组件。 它们都带有Spring Integration 4.0。
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.messaging.MessageChannel; @Configuration @ComponentScan("com.onlinetechvision.integration") @EnableIntegration @IntegrationComponentScan("com.onlinetechvision.integration") public class AppConfiguration { /** * Creates a new cargoGWDefaultRequest Channel and registers to BeanFactory. * * @return direct channel */ @Bean public MessageChannel cargoGWDefaultRequestChannel() { return new DirectChannel(); } /** * Creates a new cargoSplitterOutput Channel and registers to BeanFactory. * * @return direct channel */ @Bean public MessageChannel cargoSplitterOutputChannel() { return new DirectChannel(); } /** * Creates a new cargoFilterOutput Channel and registers to BeanFactory. * * @return direct channel */ @Bean public MessageChannel cargoFilterOutputChannel() { return new DirectChannel(); } /** * Creates a new cargoRouterDomesticOutput Channel and registers to BeanFactory. * * @return direct channel */ @Bean public MessageChannel cargoRouterDomesticOutputChannel() { return new DirectChannel(); } /** * Creates a new cargoRouterInternationalOutput Channel and registers to BeanFactory. * * @return direct channel */ @Bean public MessageChannel cargoRouterInternationalOutputChannel() { return new DirectChannel(); } /** * Creates a new cargoTransformerOutput Channel and registers to BeanFactory. * * @return direct channel */ @Bean public MessageChannel cargoTransformerOutputChannel() { return new DirectChannel(); } }
STEP 7:消息传递网关
CargoGateway接口向应用程序公开特定于域的方法。 换句话说,它提供了对消息传递系统的应用程序访问。 @MessagingGateway也是Spring Integration 4.0附带的,它简化了消息传递系统中的网关创建。 它的默认请求通道是cargoGWDefaultRequestChannel 。
import java.util.List; import org.springframework.integration.annotation.Gateway; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.messaging.Message; import com.onlinetechvision.model.Cargo; @MessagingGateway(name = "cargoGateway", defaultRequestChannel = "cargoGWDefaultRequestChannel") public interface ICargoGateway { /** * Processes Cargo Request * * @param message SI Message covering Cargo List payload and Batch Cargo Id header. * @return operation result */ @Gateway void processCargoRequest(Message<List<Cargo>> message); }
步骤8:邮件分割器
CargoSplitter侦听cargoGWDefaultRequestChannel通道并将传入的“货物清单”分解为“货物”消息。 货物消息被发送到cargoSplitterOutputChannel。
import java.util.List; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.Splitter; import org.springframework.messaging.Message; import com.onlinetechvision.model.Cargo; @MessageEndpoint public class CargoSplitter { /** * Splits Cargo List to Cargo message(s) * * @param message SI Message covering Cargo List payload and Batch Cargo Id header. * @return cargo list */ @Splitter(inputChannel = "cargoGWDefaultRequestChannel", outputChannel = "cargoSplitterOutputChannel") public List<Cargo> splitCargoList(Message<List<Cargo>> message) { return message.getPayload(); } }
步骤9:消息过滤器
CargoFilter确定是否将消息传递到消息通道。 它侦听cargoSplitterOutputChannel通道并过滤超出重量限制的货物消息。 如果货运消息低于重量限制,则将其发送到cargoFilterOutputChannel通道。 如果货运消息高于重量限制,则将其发送到cargoFilterDiscardChannel通道。
import org.springframework.integration.annotation.Filter; import org.springframework.integration.annotation.MessageEndpoint; import com.onlinetechvision.model.Cargo; @MessageEndpoint public class CargoFilter { private static final long CARGO_WEIGHT_LIMIT = 1_000; /** * Checks weight of cargo and filters if it exceeds limit. * * @param Cargo message * @return check result */ @Filter(inputChannel="cargoSplitterOutputChannel", outputChannel="cargoFilterOutputChannel", discardChannel="cargoFilterDiscardChannel") public boolean filterIfCargoWeightExceedsLimit(Cargo cargo) { return cargo.getWeight() <= CARGO_WEIGHT_LIMIT; } }
步骤10:丢弃货物消息监听器
DiscardedCargoMessageListener侦听cargoFilterDiscard通道并处理由CargoFilter丢弃的Cargo消息。
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.handler.annotation.Header; import com.onlinetechvision.model.Cargo; @MessageEndpoint public class DiscardedCargoMessageListener { private final Logger logger = LoggerFactory.getLogger(DiscardedCargoMessageListener.class); /** * Handles discarded domestic and international cargo request(s) and logs. * * @param cargo domestic/international cargo message * @param batchId message header shows cargo batch id */ @ServiceActivator(inputChannel = "cargoFilterDiscardChannel") public void handleDiscardedCargo(Cargo cargo, @Header("CARGO_BATCH_ID") long batchId) { logger.debug("Message in Batch[" + batchId + "] is received with Discarded payload : " + cargo); } }
步骤11:邮件路由器
CargoRouter确定哪些频道(如果有)接下来应接收该消息。 它侦听cargoFilterOutputChannel通道并根据货物运输类型返回相关的通道名称。 换句话说,它将进入的货物消息路由到国内( cargoRouterDomesticOutputChannel )或国际( cargoRouterInternationalOutputChannel )货物通道。 另外,如果未设置运送类型,则返回nullChannel 。 nullChannel是一个虚拟消息通道,主要用于测试和调试。 它不会将消息从发送方发送到接收方,但其send方法始终返回true,而receive方法返回null值。
import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.Router; import com.onlinetechvision.model.Cargo; import com.onlinetechvision.model.Cargo.ShippingType; @MessageEndpoint public class CargoRouter { /** * Determines cargo request' s channel in the light of shipping type. * * @param Cargo message * @return channel name */ @Router(inputChannel="cargoFilterOutputChannel") public String route(Cargo cargo) { if(cargo.getShippingType() == ShippingType.DOMESTIC) { return "cargoRouterDomesticOutputChannel"; } else if(cargo.getShippingType() == ShippingType.INTERNATIONAL) { return "cargoRouterInternationalOutputChannel"; } return "nullChannel"; } }
步骤12:讯息变压器
CargoTransformer侦听cargoRouterDomesticOutputChannel和cargoRouterInternationalOutputChannel并将传入的货运请求转换为国内和国际货运消息。 之后,它将它们发送到cargoTransformerOutputChannel通道。
import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.Transformer; import com.onlinetechvision.model.Cargo; import com.onlinetechvision.model.DomesticCargoMessage; import com.onlinetechvision.model.DomesticCargoMessage.Region; import com.onlinetechvision.model.InternationalCargoMessage; import com.onlinetechvision.model.InternationalCargoMessage.DeliveryOption; @MessageEndpoint public class CargoTransformer { /** * Transforms Cargo request to Domestic Cargo obj. * * @param cargo * request * @return Domestic Cargo obj */ @Transformer(inputChannel = "cargoRouterDomesticOutputChannel", outputChannel = "cargoTransformerOutputChannel") public DomesticCargoMessage transformDomesticCargo(Cargo cargo) { return new DomesticCargoMessage(cargo, Region.fromValue(cargo.getRegion())); } /** * Transforms Cargo request to International Cargo obj. * * @param cargo * request * @return International Cargo obj */ @Transformer(inputChannel = "cargoRouterInternationalOutputChannel", outputChannel = "cargoTransformerOutputChannel") public InternationalCargoMessage transformInternationalCargo(Cargo cargo) { return new InternationalCargoMessage(cargo, getDeliveryOption(cargo.getDeliveryDayCommitment())); } /** * Get delivery option by delivery day commitment. * * @param deliveryDayCommitment delivery day commitment * @return delivery option */ private DeliveryOption getDeliveryOption(int deliveryDayCommitment) { if (deliveryDayCommitment == 1) { return DeliveryOption.NEXT_FLIGHT; } else if (deliveryDayCommitment == 2) { return DeliveryOption.PRIORITY; } else if (deliveryDayCommitment > 2 && deliveryDayCommitment < 5) { return DeliveryOption.ECONOMY; } else { return DeliveryOption.STANDART; } } }
步骤13:消息服务**器
CargoServiceActivator是用于将服务实例连接到消息传递系统的通用端点。 它侦听cargoTransformerOutputChannel通道并获取已处理的国内和国际货物消息和日志。
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.handler.annotation.Header; import com.onlinetechvision.model.CargoMessage; @MessageEndpoint public class CargoServiceActivator { private final Logger logger = LoggerFactory.getLogger(CargoServiceActivator.class); /** * Gets processed domestic and international cargo request(s) and logs. * * @param cargoMessage domestic/international cargo message * @param batchId message header shows cargo batch id */ @ServiceActivator(inputChannel = "cargoTransformerOutputChannel") public void getCargo(CargoMessage cargoMessage, @Header("CARGO_BATCH_ID") long batchId) { logger.debug("Message in Batch[" + batchId + "] is received with payload : " + cargoMessage); } }
步骤14:申请
创建应用程序类以运行应用程序。 它初始化应用程序上下文并将货运请求发送到消息传递系统。
import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.messaging.support.MessageBuilder; import com.onlinetechvision.integration.ICargoGateway; import com.onlinetechvision.model.Cargo; import com.onlinetechvision.model.Cargo.ShippingType; public class Application { public static void main(String[] args) { ApplicationContext ctx = new AnnotationConfigApplicationContext(AppConfiguration.class); ICargoGateway orderGateway = ctx.getBean(ICargoGateway.class); getCargoBatchMap().forEach( (batchId, cargoList) -> orderGateway.processCargoRequest(MessageBuilder .withPayload(cargoList) .setHeader("CARGO_BATCH_ID", batchId) .build())); } /** * Creates a sample cargo batch map covering multiple batches and returns. * * @return cargo batch map */ private static Map<Integer, List<Cargo>> getCargoBatchMap() { Map<Integer, List<Cargo>> cargoBatchMap = new HashMap<>(); cargoBatchMap.put(1, Arrays.asList( new Cargo.CargoBuilder(1, "Receiver_Name1", "Address1", 0.5, ShippingType.DOMESTIC) .setRegion(1).setDescription("Radio").build(), //Second cargo is filtered due to weight limit new Cargo.CargoBuilder(2, "Receiver_Name2", "Address2", 2_000, ShippingType.INTERNATIONAL) .setDeliveryDayCommitment(3).setDescription("Furniture").build(), new Cargo.CargoBuilder(3, "Receiver_Name3", "Address3", 5, ShippingType.INTERNATIONAL) .setDeliveryDayCommitment(2).setDescription("TV").build(), //Fourth cargo is not processed due to no shipping type found new Cargo.CargoBuilder(4, "Receiver_Name4", "Address4", 8, null) .setDeliveryDayCommitment(2).setDescription("Chair").build())); cargoBatchMap.put(2, Arrays.asList( //Fifth cargo is filtered due to weight limit new Cargo.CargoBuilder(5, "Receiver_Name5", "Address5", 1_200, ShippingType.DOMESTIC) .setRegion(2).setDescription("Refrigerator").build(), new Cargo.CargoBuilder(6, "Receiver_Name6", "Address6", 20, ShippingType.DOMESTIC) .setRegion(3).setDescription("Table").build(), //Seventh cargo is not processed due to no shipping type found new Cargo.CargoBuilder(7, "Receiver_Name7", "Address7", 5, null) .setDeliveryDayCommitment(1).setDescription("TV").build())); cargoBatchMap.put(3, Arrays.asList( new Cargo.CargoBuilder(8, "Receiver_Name8", "Address8", 200, ShippingType.DOMESTIC) .setRegion(2).setDescription("Washing Machine").build(), new Cargo.CargoBuilder(9, "Receiver_Name9", "Address9", 4.75, ShippingType.INTERNATIONAL) .setDeliveryDayCommitment(1).setDescription("Document").build())); return Collections.unmodifiableMap(cargoBatchMap); } }
步骤15:建立专案
货运请求的运营结果如下:
货物1:已成功发送到服务**器。
货物2:由于重量限制而被过滤。
货物3:已成功发送到服务**器。
货物4:由于没有运输类型,因此未处理。
货物5:由于重量限制而被过滤。
货物6:已成功发送到服务**器。
货物7:由于没有运输类型,因此未处理。
货物8:已成功发送到服务**器。
货物9:已成功发送到服务**器。
生成并运行项目后,将看到以下控制台输出日志:
2014-12-09 23:43:51 [main] DEBUG c.o.i.CargoServiceActivator - Message in Batch[1] is received with payload : DomesticCargoMessage [cargo=Cargo [trackingId=1, receiverName=Receiver_Name1, deliveryAddress=Address1, weight=0.5, description=Radio, shippingType=DOMESTIC, deliveryDayCommitment=0, region=1], region=NORTH] 2014-12-09 23:43:51 [main] DEBUG c.o.i.DiscardedCargoMessageListener - Message in Batch[1] is received with Discarded payload : Cargo [trackingId=2, receiverName=Receiver_Name2, deliveryAddress=Address2, weight=2000.0, description=Furniture, shippingType=INTERNATIONAL, deliveryDayCommitment=3, region=0] 2014-12-09 23:43:51 [main] DEBUG c.o.i.CargoServiceActivator - Message in Batch[1] is received with payload : InternationalCargoMessage [cargo=Cargo [trackingId=3, receiverName=Receiver_Name3, deliveryAddress=Address3, weight=5.0, description=TV, shippingType=INTERNATIONAL, deliveryDayCommitment=2, region=0], deliveryOption=PRIORITY] 2014-12-09 23:43:51 [main] DEBUG c.o.i.DiscardedCargoMessageListener - Message in Batch[2] is received with Discarded payload : Cargo [trackingId=5, receiverName=Receiver_Name5, deliveryAddress=Address5, weight=1200.0, description=Refrigerator, shippingType=DOMESTIC, deliveryDayCommitment=0, region=2] 2014-12-09 23:43:51 [main] DEBUG c.o.i.CargoServiceActivator - Message in Batch[2] is received with payload : DomesticCargoMessage [cargo=Cargo [trackingId=6, receiverName=Receiver_Name6, deliveryAddress=Address6, weight=20.0, description=Table, shippingType=DOMESTIC, deliveryDayCommitment=0, region=3], region=EAST] 2014-12-09 23:43:51 [main] DEBUG c.o.i.CargoServiceActivator - Message in Batch[3] is received with payload : DomesticCargoMessage [cargo=Cargo [trackingId=8, receiverName=Receiver_Name8, deliveryAddress=Address8, weight=200.0, description=Washing Machine, shippingType=DOMESTIC, deliveryDayCommitment=0, region=2], region=SOUTH] 2014-12-09 23:43:51 [main] DEBUG c.o.i.CargoServiceActivator - Message in Batch[3] is received with payload : InternationalCargoMessage [cargo=Cargo [trackingId=9, receiverName=Receiver_Name9, deliveryAddress=Address9, weight=4.75, description=Document, shippingType=INTERNATIONAL, deliveryDayCommitment=1, region=0], deliveryOption=NEXT_FLIGHT]
源代码
参考文献
- 企业整合模式
- Spring集成参考手册
- Spring Integration 4.1.0.RELEASE API
- Pro Spring整合
- Spring Integration 3.0.2和4.0 Milestone 4发布
翻译自: https://www.javacodegeeks.com/2014/12/message-processing-with-spring-integration.html