SpringCloud Stream+RabbitMQ消息分组
本篇记录SpringCloud Stream+RabbitMQ 消息分组功能的实现。
消息分组介绍
通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。 默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费(不管被哪个实例处理),这个时候我们需要为这些消费者设置消费组来实现这样的功能,实现的方式非常简单,我们只需要在服务消费者端设置spring.cloud.stream.bindings.{channel-name}.group属性即可。
在本篇中有三个项目节点,StreamProvider是消息生产端,StreamConsumer0和StreamConsumer1是消息消费端。
1 父maven工程
1.1 工程结构如下:
1.2 pom.xml如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.study</groupId>
<artifactId>cloud-ma</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>SpringCloudStudy</name>
<description>SpringCloudStudy</description>
<!-- 私有仓库的配置 -->
<repositories>
<repository>
<id>nexus</id> <!-- 和setting.xml中配置的id保持一致 -->
<url>http://xxx.xxx.xxx.xxx:8081/repository/maven-public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
</properties>
<dependencies>
<!-- 上边引入 parent,因此 下边无需指定版本 -->
<!-- 包含 mvc,aop 等jar资源 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion><!-- 去除默认log配置 -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 配置log4j2 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!-- 配置log4j2 -->
<!-- 支持识别yml配置 -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>
<!-- 支持识别yml配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 热部署 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
<scope>true</scope>
</dependency>
<!--开始 阿里的fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<!--结束 阿里的fastjson -->
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<!-- 没有该配置,devtools 不生效 -->
<fork>true</fork>
</configuration>
</plugin>
</plugins>
</build>
<modules>
<module>EurekaServer</module>
<module>EurekaClientHi</module>
<module>EurekaClientRibbonCustomer</module>
<module>EurekaClientHi2</module>
<module>EurekaClientFeignCustomer</module>
<module>EurekaClientZuul</module>
<module>config_server</module>
<module>config-client</module>
<module>config-server-svn</module>
<module>config-client-svn</module>
<module>StreamProvider</module>
<module>stream-output</module>
<module>stream-input</module>
<module>StreamRabbitMQSelf</module>
<module>StreamConsumer0</module>
<module>StreamConsumer1</module>
</modules>
</project>
2 StreamProvider工程节点(消息生产端)
2.1 工程结构
2.2 POM.xml
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.study</groupId>
<artifactId>cloud-ma</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>StreamProvider</artifactId>
<packaging>jar</packaging>
<name>StreamProvider</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
2.3 application.yml
server:
port: 8089
spring:
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment: #配置rabbimq连接环境
spring:
rabbitmq:
host: xxx.xxx.xxx.xxx
username: mazhen
password: mazhen
virtual-host: /
bindings:
output: #生产者绑定,这个是消息通道的名称
destination: exchange-msg #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-msg交换器。
content-type: application/json
配置了spring.cloud.stream.bindings.output.destination=exchange-msg 后会在RabbitMQ 中创建一个名为 exchange-msg 交换器(exchange)。spring.cloud.stream.bindings.output.destination=exchange-msg 的意思是把 spring cloud stream 的消息输出通道绑定到 RabbitMQ 的 exchange-msg 交换器。
2.4 消息生产类
2.4.1 消息生产类—接口
/**
*
*/
package com.stream.provider.rabbitMQ.service;
import org.springframework.integration.core.MessageSource;
/**
* @author mazhen
*
*/
public interface SendMsg {
public MessageSource<Integer> timerMessageSource();
}
2.4.2 消息生产类—实现类
/**
*
*/
package com.stream.provider.rabbitMQ.service.impl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;
import com.stream.provider.rabbitMQ.service.SendMsg;
/**
* @author mazhen
*
*/
@EnableBinding(value={Source.class})
public class SendMsgImpl implements SendMsg {
private static Logger logger = LoggerFactory.getLogger(SendMsgImpl.class);
private Integer i=0;
@Bean
@InboundChannelAdapter(value = Source.OUTPUT , poller = @Poller(fixedDelay = "2000", maxMessagesPerPoll = "1"))
@Override
public MessageSource<Integer> timerMessageSource() {
logger.info("发送消息:"+i++);
return () -> new GenericMessage<>(i++);
}
}
2.5 启动类
package com.stream.provider;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Hello world!
*
*/
@SpringBootApplication
public class StreamProviderApplication {
public static void main( String[] args ) {
SpringApplication.run(StreamProviderApplication.class, args);
}
}
3 StreamConsumer0工程(消费端)
3.1 工程结构
3.2 POM.xml
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.study</groupId>
<artifactId>cloud-ma</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>StreamConsumer0</artifactId>
<name>StreamConsumer0</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
3.3 application.yml
server:
port: 8090
spring:
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment: #配置rabbimq连接环境
spring:
rabbitmq:
host: xxx.xxx.xxx.xxx
username: xxx
password: xxx
virtual-host: /
bindings:
input: #生产者绑定,这个是消息通道的名称
group: group-A #该项目节点为消息组group-A的一个消费端
destination: exchange-msg #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输入通道绑定到RabbitMQ的exchange-msg交换器。
content-type: application/json
配置了 spring.cloud.stream.bindings.input.destination=exchange-msg 后会在RabbitMQ 中创建一个名为 exchange-msg 交换器(exchange)。spring.cloud.stream.bindings.input.destination=exchange-msg 的意思是把 spring cloud stream 的输入通道绑定到 RabbitMQ 的 exchange-msg 交换器。这样工程节点 StreamConsumer0 的输入通道对应节点 StreamProvider 的输出通道,StreamConsumer0 节点就配置成了 StreamProvider 节点的消费端。spring.cloud.stream.bindings.input.group=group-A 配置 StreamConsumer0 为消息组 group-A 中的一个消费端。这两个配置项联合起来解释,就是把节点 StreamConsumer0 的输入通道绑定到 RabbitMQ 的 exchange-msg 交换器,并设置为 exchange-msg 交换器中 group-A 消息消费组中的消费端节点。
3.4 消息消费类
3.4.1 消息消费类—接口
/**
*
*/
package com.stream.consumer0.rabbitMQ.service;
/**
* @author mazhen
*
*/
public interface ReceviceMsg {
public void receive(String payload);
}
3.4.2 消息消费类—实现类
/**
*
*/
package com.stream.consumer0.rabbitMQ.service.impl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import com.stream.consumer0.rabbitMQ.service.ReceviceMsg;
/**
* @author mazhen
*
*/
@EnableBinding(value = {Sink.class})
public class ReceviceMsgImpl implements ReceviceMsg {
private static Logger logger = LoggerFactory.getLogger(ReceviceMsgImpl.class);
@StreamListener(Sink.INPUT)
@Override
public void receive(String payload) {
logger.info("接收消息:"+payload);
}
}
3.5 启动类
package com.stream.consumer0;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 启动类
*
*/
@SpringBootApplication
public class StreamConsumer0Application {
public static void main( String[] args ) {
SpringApplication.run(StreamConsumer0Application.class, args);
}
}
4 StreamConsumer1工程(消费端)
4.1 工程结构
4.2 POM.xml
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.study</groupId>
<artifactId>cloud-ma</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>StreamConsumer1</artifactId>
<name>StreamConsumer1</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
4.3 application.yml
server:
port: 8091
spring:
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment: #配置rabbimq连接环境
spring:
rabbitmq:
host: xxx.xxx.xxx.xxx
username: xxx
password: xxx
virtual-host: /
bindings:
input: #生产者绑定,这个是消息通道的名称
group: group-A #该项目节点为消息组group-A的一个消费端
destination: exchange-msg #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输入通道绑定到RabbitMQ的exchange-msg交换器。
content-type: application/json
配置了 spring.cloud.stream.bindings.input.destination=exchange-msg 后会在RabbitMQ 中创建一个名为 exchange-msg 交换器(exchange)。spring.cloud.stream.bindings.input.destination=exchange-msg 的意思是把 spring cloud stream 的输入通道绑定到 RabbitMQ 的 exchange-msg 交换器。这样工程节点 StreamConsumer1 的输入通道对应节点 StreamProvider 的输出通道,StreamConsumer1 节点就配置成了 StreamProvider 节点的消费端。spring.cloud.stream.bindings.input.group=group-A 配置 StreamConsumer1 为消息组 group-A 中的一个消费端。这两个配置项联合起来解释,就是把节点 StreamConsumer1 的输入通道绑定到 RabbitMQ 的 exchange-msg 交换器,并设置为 exchange-msg 交换器中 group-A 消息消费组中的消费端节点。
4.4 消息消费类
4.4.1 消息消费类–接口
/**
*
*/
package com.stream.consumer1.rabbitMQ.service;
/**
* @author mazhen
*
*/
public interface ReceviceMsg {
public void receive(String payload);
}
4.4.2 消息消费类–实现类
/**
*
*/
package com.stream.consumer1.rabbitMQ.service.impl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import com.stream.consumer1.rabbitMQ.service.ReceviceMsg;
/**
* @author mazhen
*
*/
@EnableBinding(value = {Sink.class})
public class ReceviceMsgImpl implements ReceviceMsg {
private static Logger logger = LoggerFactory.getLogger(ReceviceMsgImpl.class);
@StreamListener(Sink.INPUT)
@Override
public void receive(String payload) {
logger.info("接收消息:"+payload);
}
}
4.5 启动类
package com.stream.consumer1;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Hello world!
*
*/
@SpringBootApplication
public class StreamConsumer1Application {
public static void main( String[] args ) {
SpringApplication.run(StreamConsumer1Application.class, args);
}
}
5 测试
- 启动RabbitMQ
- 依次启动节点 StreamConsumer0 、 StreamConsumer1和StreamProvider
5.1 exchange-msg 交换器
从下图中可以看到,RabbitMQ 中已经创建了 exchange-msg 交换器:
5.2 exchange-msg.group-A 消息队列
RabbitMQ 中也已经创建了exchange-msg.group-A 消息队列:
5.3 消费节点接收到的消息
StreamConsumer0节点接收的消息:
StreamConsumer1节点接收的消息:
通过两个消费端输出的消息我们看到,每条消息只会被其中一个消费节点接收。从而实现了在一个消费组下的任何实例(节点)都可以消费,避免重复消费。