SpringCloud Stream+RabbitMQ消息分组

本篇记录SpringCloud Stream+RabbitMQ 消息分组功能的实现。

消息分组介绍

        通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。 默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费(不管被哪个实例处理),这个时候我们需要为这些消费者设置消费组来实现这样的功能,实现的方式非常简单,我们只需要在服务消费者端设置spring.cloud.stream.bindings.{channel-name}.group属性即可。

        在本篇中有三个项目节点,StreamProvider是消息生产端,StreamConsumer0和StreamConsumer1是消息消费端。

1 父maven工程

1.1 工程结构如下:

SpringCloud Stream+RabbitMQ消息分组

1.2 pom.xml如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  
  <groupId>com.study</groupId>
  <artifactId>cloud-ma</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>pom</packaging>
  <name>SpringCloudStudy</name>
  <description>SpringCloudStudy</description>
  
  <!-- 私有仓库的配置 -->
  <repositories>          
	<repository>            
	    <id>nexus</id> <!-- 和setting.xml中配置的id保持一致 -->           
		<url>http://xxx.xxx.xxx.xxx:8081/repository/maven-public/</url>            
	    <releases>
		    <enabled>true</enabled>
	    </releases>           
		<snapshots>
			<enabled>true</enabled>
		</snapshots>          
	  </repository>               
   </repositories>
  
  <parent>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-parent</artifactId>
     <version>2.0.3.RELEASE</version>
     <relativePath/>
  </parent>
  
  <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     <java.version>1.8</java.version>
     <spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
  </properties>
  
  <dependencies>
  
    <!-- 上边引入 parent,因此 下边无需指定版本 -->
	    <!-- 包含 mvc,aop 等jar资源 -->
	    <dependency>
	        <groupId>org.springframework.boot</groupId>
	        <artifactId>spring-boot-starter-web</artifactId>
	        <exclusions>
	            <exclusion><!-- 去除默认log配置 -->
	                <groupId>org.springframework.boot</groupId>
	                <artifactId>spring-boot-starter-logging</artifactId>
	            </exclusion>
	        </exclusions>
	    </dependency>
	    
	    <!-- 配置log4j2 -->
	    <dependency>
	        <groupId>org.springframework.boot</groupId>
	        <artifactId>spring-boot-starter-log4j2</artifactId>
	    </dependency>
	    <!-- 配置log4j2 -->
	    
	    <!-- 支持识别yml配置 -->
	    <dependency>
	        <groupId>com.fasterxml.jackson.dataformat</groupId>
	        <artifactId>jackson-dataformat-yaml</artifactId>
	    </dependency>
	    <!-- 支持识别yml配置 -->
  
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    
    <!-- 热部署 -->
	<dependency>
	    <groupId>org.springframework.boot</groupId>
	    <artifactId>spring-boot-devtools</artifactId>
	    <optional>true</optional>
	    <scope>true</scope>
	</dependency>
	
	<!--开始  阿里的fastjson  -->
	<dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.51</version>
    </dependency>
    <!--结束  阿里的fastjson  -->
    
  </dependencies>
  
  <dependencyManagement>
      <dependencies>
          <dependency>
              <groupId>org.springframework.cloud</groupId>
              <artifactId>spring-cloud-dependencies</artifactId>
              <version>${spring-cloud.version}</version>
              <type>pom</type>
              <scope>import</scope>
          </dependency>
      </dependencies>
  </dependencyManagement>
  
  <build>
     <plugins>
         <plugin>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-maven-plugin</artifactId>
             <configuration>
	                <!-- 没有该配置,devtools 不生效 -->
	                <fork>true</fork>
	         </configuration>
         </plugin>
     </plugins>
  </build>

  
  <modules>
  	<module>EurekaServer</module>
  	<module>EurekaClientHi</module>
    <module>EurekaClientRibbonCustomer</module>
    <module>EurekaClientHi2</module>
    <module>EurekaClientFeignCustomer</module>
    <module>EurekaClientZuul</module>
    <module>config_server</module>
    <module>config-client</module>
    <module>config-server-svn</module>
    <module>config-client-svn</module>
    <module>StreamProvider</module>
    <module>stream-output</module>
    <module>stream-input</module>
    <module>StreamRabbitMQSelf</module>
    <module>StreamConsumer0</module>
    <module>StreamConsumer1</module>
  </modules>
</project>

2 StreamProvider工程节点(消息生产端)

2.1 工程结构

SpringCloud Stream+RabbitMQ消息分组

2.2 POM.xml

<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>com.study</groupId>
    <artifactId>cloud-ma</artifactId>
    <version>0.0.1-SNAPSHOT</version>
  </parent>

  <artifactId>StreamProvider</artifactId>
  <packaging>jar</packaging>
  <name>StreamProvider</name>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  <dependencies>
  
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
  
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>

2.3 application.yml

server:
  port: 8089
spring:
  cloud:
    stream:
      binders: 
        defaultRabbit: 
          type: rabbit
          environment:                                      #配置rabbimq连接环境
            spring: 
              rabbitmq:
                host: xxx.xxx.xxx.xxx
                username: mazhen
                password: mazhen
                virtual-host: / 
      bindings: 
        output:                                             #生产者绑定,这个是消息通道的名称
          destination: exchange-msg                         #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-msg交换器。
          content-type: application/json

配置了spring.cloud.stream.bindings.output.destination=exchange-msg 后会在RabbitMQ 中创建一个名为 exchange-msg 交换器(exchange)。spring.cloud.stream.bindings.output.destination=exchange-msg 的意思是把 spring cloud stream 的消息输出通道绑定到 RabbitMQ 的 exchange-msg 交换器。

2.4 消息生产类

2.4.1 消息生产类—接口

/**
 * 
 */
package com.stream.provider.rabbitMQ.service;

import org.springframework.integration.core.MessageSource;

/**
 * @author mazhen
 *
 */
public interface SendMsg {
	public MessageSource<Integer> timerMessageSource();
}

2.4.2 消息生产类—实现类

/**
 * 
 */
package com.stream.provider.rabbitMQ.service.impl;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;

import com.stream.provider.rabbitMQ.service.SendMsg;

/**
 * @author mazhen
 *
 */
@EnableBinding(value={Source.class})
public class SendMsgImpl implements SendMsg {
   
	private static Logger logger = LoggerFactory.getLogger(SendMsgImpl.class);
	
	private Integer i=0;
	
	@Bean
	@InboundChannelAdapter(value = Source.OUTPUT , poller = @Poller(fixedDelay = "2000", maxMessagesPerPoll = "1"))
	@Override
	public MessageSource<Integer> timerMessageSource() {
		logger.info("发送消息:"+i++);
		return () -> new GenericMessage<>(i++);
	}
}

2.5 启动类

package com.stream.provider;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * Hello world!
 *
 */
@SpringBootApplication
public class StreamProviderApplication {
    public static void main( String[] args ) {
        SpringApplication.run(StreamProviderApplication.class, args);
    }
}

3 StreamConsumer0工程(消费端)

3.1 工程结构

SpringCloud Stream+RabbitMQ消息分组

3.2 POM.xml

<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>com.study</groupId>
    <artifactId>cloud-ma</artifactId>
    <version>0.0.1-SNAPSHOT</version>
  </parent>

  <artifactId>StreamConsumer0</artifactId>
  <name>StreamConsumer0</name>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  <dependencies>
  
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
   
    
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
  
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>

3.3 application.yml

server:
  port: 8090
spring:
  cloud:
    stream:
      binders: 
        defaultRabbit: 
          type: rabbit
          environment:                                      #配置rabbimq连接环境
            spring: 
              rabbitmq:
                host: xxx.xxx.xxx.xxx
                username: xxx
                password: xxx
                virtual-host: / 
      bindings: 
        input:                                              #生产者绑定,这个是消息通道的名称
          group: group-A                                    #该项目节点为消息组group-A的一个消费端         
          destination: exchange-msg                         #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输入通道绑定到RabbitMQ的exchange-msg交换器。
          content-type: application/json

配置了 spring.cloud.stream.bindings.input.destination=exchange-msg 后会在RabbitMQ 中创建一个名为 exchange-msg 交换器(exchange)。spring.cloud.stream.bindings.input.destination=exchange-msg 的意思是把 spring cloud stream 的输入通道绑定到 RabbitMQ 的 exchange-msg 交换器。这样工程节点 StreamConsumer0 的输入通道对应节点 StreamProvider 的输出通道,StreamConsumer0 节点就配置成了 StreamProvider 节点的消费端。spring.cloud.stream.bindings.input.group=group-A 配置 StreamConsumer0 为消息组 group-A 中的一个消费端。这两个配置项联合起来解释,就是把节点 StreamConsumer0 的输入通道绑定到 RabbitMQ 的 exchange-msg 交换器,并设置为 exchange-msg 交换器中 group-A 消息消费组中的消费端节点。

3.4 消息消费类

3.4.1 消息消费类—接口

/**
 * 
 */
package com.stream.consumer0.rabbitMQ.service;

/**
 * @author mazhen
 *
 */
public interface ReceviceMsg {
	public void receive(String payload);
}

3.4.2 消息消费类—实现类

/**
 * 
 */
package com.stream.consumer0.rabbitMQ.service.impl;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

import com.stream.consumer0.rabbitMQ.service.ReceviceMsg;

/**
 * @author mazhen
 *
 */
@EnableBinding(value = {Sink.class})
public class ReceviceMsgImpl implements ReceviceMsg {

	private static Logger logger = LoggerFactory.getLogger(ReceviceMsgImpl.class);
	
	@StreamListener(Sink.INPUT)
	@Override
	public void receive(String payload) {
		logger.info("接收消息:"+payload);
	}
}

3.5 启动类

package com.stream.consumer0;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 启动类
 *
 */
@SpringBootApplication
public class StreamConsumer0Application {
    public static void main( String[] args ) {
        SpringApplication.run(StreamConsumer0Application.class, args);
    }
}

4 StreamConsumer1工程(消费端)

4.1 工程结构

SpringCloud Stream+RabbitMQ消息分组

4.2 POM.xml

<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>com.study</groupId>
    <artifactId>cloud-ma</artifactId>
    <version>0.0.1-SNAPSHOT</version>
  </parent>

  <artifactId>StreamConsumer1</artifactId>
  <name>StreamConsumer1</name>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  <dependencies>
  
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
   
    
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
  
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>

4.3 application.yml

server:
  port: 8091
spring:
  cloud:
    stream:
      binders: 
        defaultRabbit: 
          type: rabbit
          environment:                                      #配置rabbimq连接环境
            spring: 
              rabbitmq:
                host: xxx.xxx.xxx.xxx
                username: xxx
                password: xxx
                virtual-host: / 
      bindings: 
        input:                                              #生产者绑定,这个是消息通道的名称
          group: group-A                                    #该项目节点为消息组group-A的一个消费端         
          destination: exchange-msg                         #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输入通道绑定到RabbitMQ的exchange-msg交换器。
          content-type: application/json

配置了 spring.cloud.stream.bindings.input.destination=exchange-msg 后会在RabbitMQ 中创建一个名为 exchange-msg 交换器(exchange)。spring.cloud.stream.bindings.input.destination=exchange-msg 的意思是把 spring cloud stream 的输入通道绑定到 RabbitMQ 的 exchange-msg 交换器。这样工程节点 StreamConsumer1 的输入通道对应节点 StreamProvider 的输出通道,StreamConsumer1 节点就配置成了 StreamProvider 节点的消费端。spring.cloud.stream.bindings.input.group=group-A 配置 StreamConsumer1 为消息组 group-A 中的一个消费端。这两个配置项联合起来解释,就是把节点 StreamConsumer1 的输入通道绑定到 RabbitMQ 的 exchange-msg 交换器,并设置为 exchange-msg 交换器中 group-A 消息消费组中的消费端节点。

4.4 消息消费类

4.4.1 消息消费类–接口

/**
 * 
 */
package com.stream.consumer1.rabbitMQ.service;

/**
 * @author mazhen
 *
 */
public interface ReceviceMsg {
	public void receive(String payload);
}

4.4.2 消息消费类–实现类

/**
 * 
 */
package com.stream.consumer1.rabbitMQ.service.impl;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

import com.stream.consumer1.rabbitMQ.service.ReceviceMsg;

/**
 * @author mazhen
 *
 */
@EnableBinding(value = {Sink.class})
public class ReceviceMsgImpl implements ReceviceMsg {

	private static Logger logger = LoggerFactory.getLogger(ReceviceMsgImpl.class);
	
	@StreamListener(Sink.INPUT)
	@Override
	public void receive(String payload) {
		logger.info("接收消息:"+payload);
	}
}

4.5 启动类

package com.stream.consumer1;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * Hello world!
 *
 */
@SpringBootApplication
public class StreamConsumer1Application {
    public static void main( String[] args ) {
        SpringApplication.run(StreamConsumer1Application.class, args);
    }
}

5 测试

  • 启动RabbitMQ
  • 依次启动节点 StreamConsumer0 、 StreamConsumer1和StreamProvider

5.1 exchange-msg 交换器

从下图中可以看到,RabbitMQ 中已经创建了 exchange-msg 交换器:
SpringCloud Stream+RabbitMQ消息分组

5.2 exchange-msg.group-A 消息队列

RabbitMQ 中也已经创建了exchange-msg.group-A 消息队列:
SpringCloud Stream+RabbitMQ消息分组

5.3 消费节点接收到的消息

StreamConsumer0节点接收的消息:
SpringCloud Stream+RabbitMQ消息分组
StreamConsumer1节点接收的消息:
SpringCloud Stream+RabbitMQ消息分组
通过两个消费端输出的消息我们看到,每条消息只会被其中一个消费节点接收。从而实现了在一个消费组下的任何实例(节点)都可以消费,避免重复消费。