SpringCloud微服务 Stream集成RabbitMQ(一)

前言

本小结我们将学习一下如何使用Spring Cloud Stream 来集成Rabbit MQ 并实现常规业务场景中的发布/订阅模式及其确认机制。
在学习本小结之前我假设大家已对Spring Cloud Stream 和Rabbit MQ 有实际场景应用的能力,如果暂未掌握请自行学习,这里不赘述。

通常而言,对于发布/订阅模式模式而言,消息的发送者一般只注重将消息推送到相应的Exchange 对应的Channel中,并不在意订阅者是否成功接收并消费掉某条消息。消息发布者只负责把消息送到队列中,订阅者只负责把消息从队列中取出然后消费,两者在业务逻辑上理应是不存在任何耦合或关联的,这也是发布/订阅模式的职责和优点所在。

案例

案例说明:本小结将创建两个服务节点Publisher和Subscribe 其中两者互为消息的发布和订阅者。即Pub是Sub的消息发布者,同时为了实现消息确认机制Pub也是Sub的订阅者,Sub至于Pub也是如此。

  • 新建 microservice-deal-cloud-stream-rabbitmq-publisher服务节点

    • 项目结构
      SpringCloud微服务 Stream集成RabbitMQ(一)
    • Core Code
      • pom.xml

        <?xml version="1.0" encoding="UTF-8"?>
        <project xmlns="http://maven.apache.org/POM/4.0.0"
        	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        	<modelVersion>4.0.0</modelVersion>
        
        	<artifactId>microservice-deal-cloud-stream-rabbitmq-publisher</artifactId>
        	<packaging>jar</packaging>
        
        	<name>microservice-deal-cloud-stream-rabbitmq-publisher</name>
        	<description>Demo project for Spring Boot</description>
        
        	<parent>
        		<groupId>com.example</groupId>
        		<artifactId>microservice-deal-parent</artifactId>
        		<version>0.0.1-SNAPSHOT</version>
        	</parent>
        
        	<dependencies>
        
        		<!-- 包含 mvc,aop 等jar资源 -->
        		<dependency>
        			<groupId>org.springframework.boot</groupId>
        			<artifactId>spring-boot-starter-web</artifactId>
        			<exclusions>
        				<exclusion><!-- 去除默认log配置 -->
        					<groupId>org.springframework.boot</groupId>
        					<artifactId>spring-boot-starter-logging</artifactId>
        				</exclusion>
        			</exclusions>
        		</dependency>
        
        		<!-- 配置log4j2 -->
        		<dependency>
        			<groupId>org.springframework.boot</groupId>
        			<artifactId>spring-boot-starter-log4j2</artifactId>
        		</dependency>
        		<!-- 配置log4j2 -->
        
        		<!-- 支持识别yml配置 -->
        		<dependency>
        			<groupId>com.fasterxml.jackson.dataformat</groupId>
        			<artifactId>jackson-dataformat-yaml</artifactId>
        		</dependency>
        		<!-- 支持识别yml配置 -->
        		<dependency>
        			<groupId>org.springframework.cloud</groupId>
        			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        		</dependency>
        	</dependencies>
        
        </project>
        
        
      • application.yml

        server:
          port: 8081
        spring:
          application:
            name: microservice-deal-cloud-stream-rabbitmq-publisher #与 microservice-deal-cloud-stream-rabbitmq-subscribe互为 Pub/Sub
            
          cloud:
            stream:
              binders: 
                defaultRabbit: 
                  type: rabbit
                  environment: #配置rabbimq连接环境
                    spring: 
                      rabbitmq:
                        host: localhost
                        username: Dustyone
                        password: bai5331359
                        virtual-host: / 
              bindings: 
                output:
                  destination: Subscribe  #exchange名称,交换模式默认是topic
                  content-type: application/json
                input: 
                  destination: Publisher
                  content-type: application/json
        
        
        eureka:
          client:
            serviceUrl:
               #defaultZone: http://Dustyone:[email protected]:8080/eureka/
               defaultZone: http://localhost:8080/eureka/ #http://eureka.springcloud.cn/eureka/
              #Authorization
              
          instance:
            hostname: Swift
            prefer-ip-address: true
            metadata-map:
              zone: Asia #Eureka可以解析的metadata,会影响到客户端的行为
              customizedMetadata: eurekaCustomizedMetadata #Eureka不能解析的metadata,不会影响客户端行为  块通过 http://localhost:8761/eureka/apps/{serviceName}查找
              #instanceId: ${spring.application.name}:${vcap.application.instance_id:${spring.application.instance_id:${random.value}}}
              instanceId: ${spring.application.name}:${server.port}
              
        feign:
          hystrix:
            enabled: true
        
      • ReceiveMsgImpl.java

        package com.example.deal.service.impl;
        
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        import org.springframework.cloud.stream.annotation.EnableBinding;
        import org.springframework.cloud.stream.annotation.StreamListener;
        import org.springframework.cloud.stream.messaging.Sink;
        import org.springframework.messaging.Message;
        import org.springframework.stereotype.Component;
        
        import com.example.deal.service.ReceiveMsg;
        
        /**
         * 
         * @ClassName::ReceiveMsgImpl
         * @author :Dustyone
         * @date :2019年4月3日 上午11:06:25
         */
        @Component
        @EnableBinding(value = { Sink.class })
        public class ReceiveMsgImpl implements ReceiveMsg {
        
        	/**
        	 * 引入日志,注意都是"org.slf4j"包下
        	 */
        	private final static Logger logger = LoggerFactory.getLogger(ReceiveMsgImpl.class);
        
        	@Override
        	@StreamListener(Sink.INPUT)
        	public void receiveTime(Message<String> message) {
        		logger.info("接收消息" + message.getPayload().toString());
        	}
        
        }
        
        
      • SendMsgImpl.java

        package com.example.deal.service.impl;
        
        import java.text.SimpleDateFormat;
        import java.util.Date;
        
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        import org.springframework.cloud.stream.annotation.EnableBinding;
        import org.springframework.cloud.stream.messaging.Source;
        import org.springframework.context.annotation.Bean;
        import org.springframework.integration.annotation.InboundChannelAdapter;
        import org.springframework.integration.annotation.Poller;
        import org.springframework.integration.core.MessageSource;
        import org.springframework.messaging.support.GenericMessage;
        
        import com.example.deal.service.SendMsg;
        
        /**
         * 
        * @ClassName::SendMsgImpl 
        * @author :Dustyone
        * @date :2019年4月3日 上午11:05:08
         */
        
        @EnableBinding(value = { Source.class })
        public class SendMsgImpl implements SendMsg {
        
        	/**
        	 * 引入日志,注意都是"org.slf4j"包下
        	 */
        	private final static Logger logger = LoggerFactory.getLogger(SendMsgImpl.class);
        
        	private String format = "yyyy-mm-dd  HH:mm:ss";
        	
        	
        	@Bean
        	@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "2000", maxMessagesPerPoll = "1"))
        	@Override
        	public MessageSource<String> sendTime() {
        		logger.info(new SimpleDateFormat(format).format(new Date()));
        		return () -> new GenericMessage<>(new SimpleDateFormat(format).format(new Date()));
        	}
        
        }
        
        
      • ReceiveMsg.java

        package com.example.deal.service;
        
        import org.springframework.messaging.Message;
        
        /**
         * 
         * @ClassName::ReceiveMsg
         * @author :Dustyone
         * @date :2019年4月3日 上午11:06:09
         */
        public interface ReceiveMsg {
        	public void receiveTime(Message<String> message);
        }
        
        
      • SendMsg.java

        package com.example.deal.service;
        
        import org.springframework.integration.core.MessageSource;
        
        /**
         * 
        * @ClassName::SendMsg 
        * @author :Dustyone
        * @date :2019年4月3日 上午11:04:30
         */
        public interface SendMsg {
        
        	public MessageSource<String> sendTime();
        
        }
        
        
  • 新建microservice-deal-cloud-stream-rabbitmq-subscribe服务节点

    • 项目结构
      SpringCloud微服务 Stream集成RabbitMQ(一)
    • Core Code
      • pom.xml

        <?xml version="1.0" encoding="UTF-8"?>
        <project xmlns="http://maven.apache.org/POM/4.0.0"
        	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        	<modelVersion>4.0.0</modelVersion>
        
        	<artifactId>microservice-deal-cloud-stream-rabbitmq-subscribe</artifactId>
        	<packaging>jar</packaging>
        
        	<name>microservice-deal-cloud-stream-rabbitmq-subscribe</name>
        	<description>Demo project for Spring Boot</description>
        
        	<parent>
        		<groupId>com.example</groupId>
        		<artifactId>microservice-deal-parent</artifactId>
        		<version>0.0.1-SNAPSHOT</version>
        	</parent>
        
        	<dependencies>
        
        		<!-- 包含 mvc,aop 等jar资源 -->
        		<dependency>
        			<groupId>org.springframework.boot</groupId>
        			<artifactId>spring-boot-starter-web</artifactId>
        			<exclusions>
        				<exclusion><!-- 去除默认log配置 -->
        					<groupId>org.springframework.boot</groupId>
        					<artifactId>spring-boot-starter-logging</artifactId>
        				</exclusion>
        			</exclusions>
        		</dependency>
        
        		<!-- 配置log4j2 -->
        		<dependency>
        			<groupId>org.springframework.boot</groupId>
        			<artifactId>spring-boot-starter-log4j2</artifactId>
        		</dependency>
        		<!-- 配置log4j2 -->
        
        		<!-- 支持识别yml配置 -->
        		<dependency>
        			<groupId>com.fasterxml.jackson.dataformat</groupId>
        			<artifactId>jackson-dataformat-yaml</artifactId>
        		</dependency>
        		<!-- 支持识别yml配置 -->
        		<dependency>
        			<groupId>org.springframework.cloud</groupId>
        			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        		</dependency>
        
        	</dependencies>
        
        </project>
        
        
      • application.yml

        server:
          port: 8082
        spring:
          application:
            name: microservice-deal-cloud-stream-rabbitmq-subscribe
          cloud:
            stream:
              binders: 
                defaultRabbit: 
                  type: rabbit
                  environment: #配置rabbimq连接环境
                    spring: 
                      rabbitmq:
                        host: localhost
                        username: Dustyone
                        password: bai5331359
                        virtual-host: / 
              bindings: 
                output:
                  destination: Publisher   #exchange名称,交换模式默认是topic
                  content-type: application/json
                input: 
                  destination: Subscribe
                  content-type: application/json
        eureka:
          client:
            serviceUrl:
               #defaultZone: http://Dustyone:[email protected]:8080/eureka/
               defaultZone: http://localhost:8080/eureka/ #http://eureka.springcloud.cn/eureka/
              #Authorization
              
          instance:
            hostname: Swift
            prefer-ip-address: true
            metadata-map:
              zone: Asia #Eureka可以解析的metadata,会影响到客户端的行为
              customizedMetadata: eurekaCustomizedMetadata #Eureka不能解析的metadata,不会影响客户端行为  块通过 http://localhost:8761/eureka/apps/{serviceName}查找
              #instanceId: ${spring.application.name}:${vcap.application.instance_id:${spring.application.instance_id:${random.value}}}
              instanceId: ${spring.application.name}:${server.port}
              
        feign:
          hystrix:
            enabled: true
        
      • ReceiveMsgImpl.java

        package com.example.deal.service.impl;
        
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        import org.springframework.cloud.stream.annotation.EnableBinding;
        import org.springframework.cloud.stream.annotation.StreamListener;
        import org.springframework.cloud.stream.messaging.Sink;
        import org.springframework.messaging.Message;
        import org.springframework.stereotype.Component;
        
        import com.example.deal.service.ReceiveMsg;
        
        /**
         * 
         * @ClassName::ReceiveMsgImpl
         * @author :Dustyone
         * @date :2019年4月3日 上午11:06:25
         */
        @Component
        @EnableBinding(value = { Sink.class })
        public class ReceiveMsgImpl implements ReceiveMsg {
        
        	/**
        	 * 引入日志,注意都是"org.slf4j"包下
        	 */
        	private final static Logger logger = LoggerFactory.getLogger(ReceiveMsgImpl.class);
        
        	@Override
        	@StreamListener(Sink.INPUT)
        	public void receiveTime(Message<String> message) {
        		logger.info("接收消息" + message.getPayload().toString());
        	}
        
        }
        
        
      • SendMsgImpl.java

        package com.example.deal.service.impl;
        
        import java.text.SimpleDateFormat;
        import java.util.Date;
        
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        import org.springframework.cloud.stream.annotation.EnableBinding;
        import org.springframework.cloud.stream.messaging.Source;
        import org.springframework.context.annotation.Bean;
        import org.springframework.integration.annotation.InboundChannelAdapter;
        import org.springframework.integration.annotation.Poller;
        import org.springframework.integration.core.MessageSource;
        import org.springframework.messaging.support.GenericMessage;
        
        import com.example.deal.service.SendMsg;
        
        /**
         * 
        * @ClassName::SendMsgImpl 
        * @author :Dustyone
        * @date :2019年4月3日 上午11:05:08
         */
        
        @EnableBinding(value = { Source.class })
        public class SendMsgImpl implements SendMsg {
        
        	/**
        	 * 引入日志,注意都是"org.slf4j"包下
        	 */
        	private final static Logger logger = LoggerFactory.getLogger(SendMsgImpl.class);
        
        	private String format = "yyyy-mm-dd  HH:mm:ss";
        	
        	
        	@Bean
        	@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "2000", maxMessagesPerPoll = "1"))
        	@Override
        	public MessageSource<String> sendTime() {
        		logger.info(new SimpleDateFormat(format).format(new Date()));
        		return () -> new GenericMessage<>(new SimpleDateFormat(format).format(new Date()));
        	}
        
        }
        
        
      • ReceiveMsg.java

        package com.example.deal.service;
        
        import org.springframework.messaging.Message;
        
        /**
         * 
         * @ClassName::ReceiveMsg
         * @author :Dustyone
         * @date :2019年4月3日 上午11:06:09
         */
        public interface ReceiveMsg {
        	public void receiveTime(Message<String> message);
        }
        
        
      • SendMsg.java

        package com.example.deal.service;
        
        import org.springframework.integration.core.MessageSource;
        
        /**
         * 
        * @ClassName::SendMsg 
        * @author :Dustyone
        * @date :2019年4月3日 上午11:04:30
         */
        public interface SendMsg {
        
        	public MessageSource<String> sendTime();
        
        }
        
        
  • 分别运行Publisher和Subscribe

    • Publisher
      SpringCloud微服务 Stream集成RabbitMQ(一)
    • Subscribe
      SpringCloud微服务 Stream集成RabbitMQ(一)

小结

  • 完整源码参考
    microservice-deal-cloud-stream-rabbitmq-publisher和microservice-deal-cloud-stream-rabbitmq-subscribe