springCloud-Stream 消息中间件的学习与使用(简单快速入门使用)
springCloud-Stream 消息中间件的学习与使用(简单快速入门使用)
1.发布与订阅的接口注册定义:
package com.weijs.demo.springcloudstream;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface UserMessageEvent {
//生产(发布者)
String USER_PRODUCER = "USER_PRODUCER";
//消费(订阅者)
String USER_CONSUMER_1 = "USER_CONSUMER_1";//消费1
String USER_CONSUMER_2 = "USER_CONSUMER_2";//消费2
@Input(USER_CONSUMER_1)
MessageChannel userConsumer1();
@Input(USER_CONSUMER_2)
SubscribableChannel userConsumer2();
@Output(USER_PRODUCER)
MessageChannel userProducer();
}
2.生产者(即发布):
调用接口发布消息
package com.weijs.demo.springcloudstream;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@RequestMapping("/user")
@RestController
public class UserProducer {
@Autowired
UserMessageEvent userMessageEvent;
@RequestMapping(value = "/send")
public void sendBrandAdd(@RequestParam String message, HttpServletRequest request) {
log.info("========【接口接收MQ发送内容参数】" + message);
Map<String, Object> map = new HashMap<>();
map.put("routingKey", "add");//设置routingKey
boolean result = userMessageEvent.userProducer().send(MessageBuilder.withPayload(message).copyHeaders(map).build());
// //不需要设置routingKey
// userMessageEvent.userProducer().send(MessageBuilder.withPayload(message).build());
if (result) {
log.info("=======send message seccess!");
}
}
}
3.消费者(即订阅者):
这里定义有个订阅者去消费消息,如果配置成功,这2个订阅者都会受到生产者发布的消息
package com.weijs.demo.springcloudstream;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class UserConsumer {
/**
* 订阅者1
* @param message
*/
@StreamListener(UserMessageEvent.USER_CONSUMER_1)
public void receive(Message<String> message) {
log.info("订阅者1========消费消息:{}",message);
log.info("订阅者1========消费消息payload:{}",message.getPayload());
log.info("订阅者1========消费消息headers:{}",message.getHeaders());
}
/**
* 订阅者2
* @param message
*/
@StreamListener(UserMessageEvent.USER_CONSUMER_2)
public void receive2(Message<String> message) {
log.info("订阅者2=========消费消息:{}",message.getPayload());
}
}
4.配置文件进行对订阅发布的绑定:
如配置文件application.yml,这是队列使用是rabbitMQ,也可以换成kafka,所以配置mq的连接
#rabbitmq
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
#cloud-stream
cloud:
stream:
bindings:
USER_PRODUCER: //这个就是接口UserMessageEvent定义的发布者
content-type: application/json //传输内容的格式
destination: userExchange //这个就是rabbitmq的交换机exchange
USER_CONSUMER_1: //这个就是接口UserMessageEvent定义的订阅者1
content-type: application/json
destination: userExchange
USER_CONSUMER_2: //这个就是接口UserMessageEvent定义的订阅者2
content-type: application/json
destination: userExchange