【Spring Cloud】Stream消息驱动
注:文章基于SpringBoot 2.1.1.RELEASE
一、消费者
- pom.xml文件引入依赖
<dependency>
<groupId>com.spring4all</groupId>
<artifactId>swagger-spring-boot-starter</artifactId>
<version>1.7.0.RELEASE</version>
</dependency>
- 创建消息发送通道
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
@Component
public interface ISendMsg {
@Output("bookMsgChannel")
SubscribableChannel sendMsg();
}
- 发送消息
import com.springclouddemo.bookservice.msg.ISendMsg;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@Api("书城服务接口")
@RestController
@Slf4j
public class BookService {
@Autowired
private ISendMsg sendMsg;
@RequestMapping("/updateBookPrice")
public String updateBookPrice(){
String msg = "来自生产者的消息 " + System.currentTimeMillis();
Message message = MessageBuilder.withPayload(msg.getBytes()).build();
sendMsg.sendMsg().send(message);
return "updateBookPrice";
}
}
- 启动类**绑定
@SpringBootApplication
@EnableEurekaClient
@EnableSwagger2Doc
@EnableBinding(ISendMsg.class)
public class BookServiceApp {
public static void main(String[] args){
SpringApplication.run(BookServiceApp.class, args);
}
}
- 启动服务,在RabbitMQ中可以看到创建的交换机bookMsgChannel
http://127.0.0.1:15672
二、消费者
- 消息通道接口
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
@Component
public interface IReadMsg {
@Input("bookMsgChannel")
SubscribableChannel readMsg();
}
- 监听消息
@Slf4j
@RestController
public class BookService {
@StreamListener("bookMsgChannel")
public void updateBookPrice(String msg){
log.info(msg);
}
}
- 启动类**绑定
@SpringBootApplication
@EnableEurekaClient
@EnableBinding(IReadMsg.class)
public class BookWebsiteApp {
public static void main(String[] args){
SpringApplication.run(BookWebsiteApp.class, args);
}
}
- 启动消费者服务,在RabbitMQ看到已绑定
- 生产者发送消息,消费者得到消息
http://127.0.0.1:8801/updateBookPrice
### 三、消息分组
- 当消费者服务做集群时,消息会被多次消费,避免此情况可使用消息分组
spring:
application:
name: BookWebsite
cloud:
stream:
bindings:
bookMsgChannel:
group: bookwebsitegroup