【Spring Cloud】Stream消息驱动

注:文章基于SpringBoot 2.1.1.RELEASE

一、消费者

  1. pom.xml文件引入依赖
<dependency>
    <groupId>com.spring4all</groupId>
    <artifactId>swagger-spring-boot-starter</artifactId>
    <version>1.7.0.RELEASE</version>
</dependency>
  1. 创建消息发送通道
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

@Component
public interface ISendMsg {

    @Output("bookMsgChannel")
    SubscribableChannel sendMsg();
}
  1. 发送消息
import com.springclouddemo.bookservice.msg.ISendMsg;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@Api("书城服务接口")
@RestController
@Slf4j
public class BookService {

    @Autowired
    private ISendMsg sendMsg;

    @RequestMapping("/updateBookPrice")
    public String updateBookPrice(){
        String msg = "来自生产者的消息 " + System.currentTimeMillis();
        Message message = MessageBuilder.withPayload(msg.getBytes()).build();
        sendMsg.sendMsg().send(message);
        return "updateBookPrice";
    }
   }
  1. 启动类**绑定
@SpringBootApplication
@EnableEurekaClient
@EnableSwagger2Doc
@EnableBinding(ISendMsg.class)
public class BookServiceApp {
    public static void main(String[] args){
        SpringApplication.run(BookServiceApp.class, args);
    }
}
  1. 启动服务,在RabbitMQ中可以看到创建的交换机bookMsgChannel
http://127.0.0.1:15672

【Spring Cloud】Stream消息驱动

二、消费者

  1. 消息通道接口
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

@Component
public interface IReadMsg {
    @Input("bookMsgChannel")
    SubscribableChannel readMsg();
}
  1. 监听消息
@Slf4j
@RestController
public class BookService {

    @StreamListener("bookMsgChannel")
    public void updateBookPrice(String msg){
        log.info(msg);
    }
}
  1. 启动类**绑定
@SpringBootApplication
@EnableEurekaClient
@EnableBinding(IReadMsg.class)
public class BookWebsiteApp {

    public static void main(String[] args){
        SpringApplication.run(BookWebsiteApp.class, args);
    }
}
  1. 启动消费者服务,在RabbitMQ看到已绑定

【Spring Cloud】Stream消息驱动

  1. 生产者发送消息,消费者得到消息
http://127.0.0.1:8801/updateBookPrice

【Spring Cloud】Stream消息驱动### 三、消息分组

  1. 当消费者服务做集群时,消息会被多次消费,避免此情况可使用消息分组
spring:
  application:
    name: BookWebsite
  cloud:
    stream:
      bindings:
        bookMsgChannel:
          group: bookwebsitegroup