SpringCloud微服务 Stream集成RabbitMQ(一)
前言
本小结我们将学习一下如何使用Spring Cloud Stream 来集成Rabbit MQ 并实现常规业务场景中的发布/订阅模式及其确认机制。
在学习本小结之前我假设大家已对Spring Cloud Stream 和Rabbit MQ 有实际场景应用的能力,如果暂未掌握请自行学习,这里不赘述。
通常而言,对于发布/订阅模式模式而言,消息的发送者一般只注重将消息推送到相应的Exchange 对应的Channel中,并不在意订阅者是否成功接收并消费掉某条消息。消息发布者只负责把消息送到队列中,订阅者只负责把消息从队列中取出然后消费,两者在业务逻辑上理应是不存在任何耦合或关联的,这也是发布/订阅模式的职责和优点所在。
案例
案例说明:本小结将创建两个服务节点Publisher和Subscribe 其中两者互为消息的发布和订阅者。即Pub是Sub的消息发布者,同时为了实现消息确认机制Pub也是Sub的订阅者,Sub至于Pub也是如此。
-
新建 microservice-deal-cloud-stream-rabbitmq-publisher服务节点
- 项目结构
- Core Code
-
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <artifactId>microservice-deal-cloud-stream-rabbitmq-publisher</artifactId> <packaging>jar</packaging> <name>microservice-deal-cloud-stream-rabbitmq-publisher</name> <description>Demo project for Spring Boot</description> <parent> <groupId>com.example</groupId> <artifactId>microservice-deal-parent</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <dependencies> <!-- 包含 mvc,aop 等jar资源 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion><!-- 去除默认log配置 --> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <!-- 配置log4j2 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <!-- 配置log4j2 --> <!-- 支持识别yml配置 --> <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-yaml</artifactId> </dependency> <!-- 支持识别yml配置 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies> </project>
-
application.yml
server: port: 8081 spring: application: name: microservice-deal-cloud-stream-rabbitmq-publisher #与 microservice-deal-cloud-stream-rabbitmq-subscribe互为 Pub/Sub cloud: stream: binders: defaultRabbit: type: rabbit environment: #配置rabbimq连接环境 spring: rabbitmq: host: localhost username: Dustyone password: bai5331359 virtual-host: / bindings: output: destination: Subscribe #exchange名称,交换模式默认是topic content-type: application/json input: destination: Publisher content-type: application/json eureka: client: serviceUrl: #defaultZone: http://Dustyone:[email protected]:8080/eureka/ defaultZone: http://localhost:8080/eureka/ #http://eureka.springcloud.cn/eureka/ #Authorization instance: hostname: Swift prefer-ip-address: true metadata-map: zone: Asia #Eureka可以解析的metadata,会影响到客户端的行为 customizedMetadata: eurekaCustomizedMetadata #Eureka不能解析的metadata,不会影响客户端行为 块通过 http://localhost:8761/eureka/apps/{serviceName}查找 #instanceId: ${spring.application.name}:${vcap.application.instance_id:${spring.application.instance_id:${random.value}}} instanceId: ${spring.application.name}:${server.port} feign: hystrix: enabled: true
-
ReceiveMsgImpl.java
package com.example.deal.service.impl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import com.example.deal.service.ReceiveMsg; /** * * @ClassName::ReceiveMsgImpl * @author :Dustyone * @date :2019年4月3日 上午11:06:25 */ @Component @EnableBinding(value = { Sink.class }) public class ReceiveMsgImpl implements ReceiveMsg { /** * 引入日志,注意都是"org.slf4j"包下 */ private final static Logger logger = LoggerFactory.getLogger(ReceiveMsgImpl.class); @Override @StreamListener(Sink.INPUT) public void receiveTime(Message<String> message) { logger.info("接收消息" + message.getPayload().toString()); } }
-
SendMsgImpl.java
package com.example.deal.service.impl; import java.text.SimpleDateFormat; import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.Poller; import org.springframework.integration.core.MessageSource; import org.springframework.messaging.support.GenericMessage; import com.example.deal.service.SendMsg; /** * * @ClassName::SendMsgImpl * @author :Dustyone * @date :2019年4月3日 上午11:05:08 */ @EnableBinding(value = { Source.class }) public class SendMsgImpl implements SendMsg { /** * 引入日志,注意都是"org.slf4j"包下 */ private final static Logger logger = LoggerFactory.getLogger(SendMsgImpl.class); private String format = "yyyy-mm-dd HH:mm:ss"; @Bean @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "2000", maxMessagesPerPoll = "1")) @Override public MessageSource<String> sendTime() { logger.info(new SimpleDateFormat(format).format(new Date())); return () -> new GenericMessage<>(new SimpleDateFormat(format).format(new Date())); } }
-
ReceiveMsg.java
package com.example.deal.service; import org.springframework.messaging.Message; /** * * @ClassName::ReceiveMsg * @author :Dustyone * @date :2019年4月3日 上午11:06:09 */ public interface ReceiveMsg { public void receiveTime(Message<String> message); }
-
SendMsg.java
package com.example.deal.service; import org.springframework.integration.core.MessageSource; /** * * @ClassName::SendMsg * @author :Dustyone * @date :2019年4月3日 上午11:04:30 */ public interface SendMsg { public MessageSource<String> sendTime(); }
-
- 项目结构
-
新建microservice-deal-cloud-stream-rabbitmq-subscribe服务节点
- 项目结构
- Core Code
-
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <artifactId>microservice-deal-cloud-stream-rabbitmq-subscribe</artifactId> <packaging>jar</packaging> <name>microservice-deal-cloud-stream-rabbitmq-subscribe</name> <description>Demo project for Spring Boot</description> <parent> <groupId>com.example</groupId> <artifactId>microservice-deal-parent</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <dependencies> <!-- 包含 mvc,aop 等jar资源 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion><!-- 去除默认log配置 --> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <!-- 配置log4j2 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <!-- 配置log4j2 --> <!-- 支持识别yml配置 --> <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-yaml</artifactId> </dependency> <!-- 支持识别yml配置 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies> </project>
-
application.yml
server: port: 8082 spring: application: name: microservice-deal-cloud-stream-rabbitmq-subscribe cloud: stream: binders: defaultRabbit: type: rabbit environment: #配置rabbimq连接环境 spring: rabbitmq: host: localhost username: Dustyone password: bai5331359 virtual-host: / bindings: output: destination: Publisher #exchange名称,交换模式默认是topic content-type: application/json input: destination: Subscribe content-type: application/json eureka: client: serviceUrl: #defaultZone: http://Dustyone:[email protected]:8080/eureka/ defaultZone: http://localhost:8080/eureka/ #http://eureka.springcloud.cn/eureka/ #Authorization instance: hostname: Swift prefer-ip-address: true metadata-map: zone: Asia #Eureka可以解析的metadata,会影响到客户端的行为 customizedMetadata: eurekaCustomizedMetadata #Eureka不能解析的metadata,不会影响客户端行为 块通过 http://localhost:8761/eureka/apps/{serviceName}查找 #instanceId: ${spring.application.name}:${vcap.application.instance_id:${spring.application.instance_id:${random.value}}} instanceId: ${spring.application.name}:${server.port} feign: hystrix: enabled: true
-
ReceiveMsgImpl.java
package com.example.deal.service.impl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import com.example.deal.service.ReceiveMsg; /** * * @ClassName::ReceiveMsgImpl * @author :Dustyone * @date :2019年4月3日 上午11:06:25 */ @Component @EnableBinding(value = { Sink.class }) public class ReceiveMsgImpl implements ReceiveMsg { /** * 引入日志,注意都是"org.slf4j"包下 */ private final static Logger logger = LoggerFactory.getLogger(ReceiveMsgImpl.class); @Override @StreamListener(Sink.INPUT) public void receiveTime(Message<String> message) { logger.info("接收消息" + message.getPayload().toString()); } }
-
SendMsgImpl.java
package com.example.deal.service.impl; import java.text.SimpleDateFormat; import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.Poller; import org.springframework.integration.core.MessageSource; import org.springframework.messaging.support.GenericMessage; import com.example.deal.service.SendMsg; /** * * @ClassName::SendMsgImpl * @author :Dustyone * @date :2019年4月3日 上午11:05:08 */ @EnableBinding(value = { Source.class }) public class SendMsgImpl implements SendMsg { /** * 引入日志,注意都是"org.slf4j"包下 */ private final static Logger logger = LoggerFactory.getLogger(SendMsgImpl.class); private String format = "yyyy-mm-dd HH:mm:ss"; @Bean @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "2000", maxMessagesPerPoll = "1")) @Override public MessageSource<String> sendTime() { logger.info(new SimpleDateFormat(format).format(new Date())); return () -> new GenericMessage<>(new SimpleDateFormat(format).format(new Date())); } }
-
ReceiveMsg.java
package com.example.deal.service; import org.springframework.messaging.Message; /** * * @ClassName::ReceiveMsg * @author :Dustyone * @date :2019年4月3日 上午11:06:09 */ public interface ReceiveMsg { public void receiveTime(Message<String> message); }
-
SendMsg.java
package com.example.deal.service; import org.springframework.integration.core.MessageSource; /** * * @ClassName::SendMsg * @author :Dustyone * @date :2019年4月3日 上午11:04:30 */ public interface SendMsg { public MessageSource<String> sendTime(); }
-
- 项目结构
-
分别运行Publisher和Subscribe
- Publisher
- Subscribe
- Publisher
小结
- 完整源码参考
microservice-deal-cloud-stream-rabbitmq-publisher和microservice-deal-cloud-stream-rabbitmq-subscribe