RabbitMq 学习笔记
RabbitMq 学习笔记
一、基础流程
P–>消息生产者:消息的发送方
X–>消息交换机:根据一定的路由规则,将消息传递到队列
Q–>队列:存放消息的队列
C–>消息消费者:监听指定队列
二、相关概念
虚拟主机
一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。可以功过rabbitMQ管理界面来新建。
交换机
Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。
这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
类型名称 | 类型描述 |
---|---|
fanout | 把所有发送到该Exchange的消息路由到所有与它绑定的Queue中 |
direct | Routing Key==Binding Key(默认) |
topic | 比较灵活的匹配机制 |
headers | 不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。 |
队列
存储消息,队列的特性是先进先出。
生产者Send Message “A”被传送到Queue中,消费者发现消息队列Queue中有订阅的消息,就会将这条消息A读取出来进行一些列的业务操作。这里只是一个消费正对应一个队列Queue,也可以多个消费者订阅同一个队列Queue,当然这里就会将Queue里面的消息平分给其他的消费者,但是会存在一个一个问题就是如果每个消息的处理时间不同,就会导致某些消费者一直在忙碌中,而有的消费者处理完了消息后一直处于空闲状态,因为前面已经提及到了Queue会平分这些消息给相应的消费者。这里我们就可以使用prefetchCount来限制每次发送给消费者消息的个数。详情见下图所示:
绑定
也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。
管理界面
rabbitMq 提供了默认的管理界面,默认帐号为guest。可以查看队列,交换机信息,创建用户及虚拟主机
补充说明
ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。
Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
Connection就是建立一个TCP连接,生产者和消费者的都是通过TCP的连接到RabbitMQ Server中的,这个后续会再程序中体现出来。
Channel虚拟连接,建立在上面TCP连接的基础上,数据流动都是通过Channel来进行的。为什么不是直接建立在TCP的基础上进行数据流动呢?如果建立在TCP的基础上进行数据流动,建立和关闭TCP连接有代价。频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。
三、交换机分类讨论
(1)directExchange
这是rabbitMq的默认交换机,当且仅当生产者提供的routing key与队列绑定到该交换机的binding key一致时,该交换机才将生产者发布的消息路由到队列中。
directExchange的配置:
@Configuration
public class RabbitConfigure {
// 构建队列
@Bean
public Queue helloQuery() {
return new Queue("hello");//传递普通的字符串消息
}
@Bean
public Queue userQuery() {
return new Queue("user");//传递java对象消息
}
}
由于为默认交换机,其bindingkey 即为队列的名字
生产者:
@Component
public class HelloSender {
@Autowired
public RabbitTemplate rabbitTemplate;
/**
* 发送给hello队列,基础应用
*/
public void sendHello() {
String message = "hello world " + new Date();
System.out.println("HelloSender sends : " + message);
rabbitTemplate.convertAndSend("hello", message);//这里的"hello"为routingkey
}
/**
* 发送对象给hello对象
*/
public void sendUser() {
User user = new User();
user.setName("eccom");
user.setDeString("test");
user.setAge("10");
System.out.println("HelloSender sends : " + user.getName());
rabbitTemplate.convertAndSend("user", user);
}
}
消费者:
@Component
public class HelloConsumer {
@RabbitListener(queues = "hello") //监听相应队列名字
@RabbitHandler
public void receiveHello(String hello) {
System.out.println("HelloConsumer receive: " + hello);
}
@RabbitListener(queues = "user")
@RabbitHandler
public void reveiveUser(User user) {
System.out.println("HelloConsumerA.receive: " + user);
}
}
运行结果:
(2)fanoutExchange
订阅广播模式,当生产者发送消息后,fanoutExchange会把该消息路由到每一个注册到该交换机上的队列。无需指定bindingKey
fanoutExchange配置:
@Configuration
public class FanoutConfigure {
@Bean
public Queue fanoutMessageA() {
return new Queue("fanout.messageA");
}
@Bean
public Queue fanoutMessageB() {
return new Queue("fanout.messageB");
}
@Bean
public Queue fanoutMessageC() {
return new Queue("fanout.messageC");
}
@Bean
public FanoutExchange exchangeFanout() {//声明一个fanout交换机
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(FanoutExchange fanoutExchange) {//将队列绑定到交换机
return BindingBuilder.bind(fanoutMessageA()).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutMessageB()).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutMessageC()).to(fanoutExchange);
}
}
生产者:
@Component
public class FanoutSender {
@Autowired
RabbitTemplate rabbitTemplate;
public void sendFanout() {
String message = "这是订阅模式消息";
System.out.println("FanoutSender publish : " + message);
//由于不用指定routerkey所以留空""
rabbitTemplate.convertAndSend("fanoutExchange", "", message);
}
}
消费者:
@Component
public class FanoutConsumer {
@RabbitListener(queues = "fanout.messageA")
@RabbitHandler
public void receiveFanoutA(String hello) {
System.out.println("receiveFanoutA receive: " + hello);
}
@RabbitListener(queues = "fanout.messageB")
@RabbitHandler
public void receiveFanoutB(String hello) {
System.out.println("receiveFanoutB receive: " + hello);
}
@RabbitListener(queues = "fanout.messageC")
@RabbitHandler
public void receiveFanoutC(String hello) {
System.out.println("receiveFanoutC receive: " + hello);
}
}
运行结果:
(3)topicExchange
模糊匹配的交换机,生产者提供的routerkey根据一定的匹配规则,去匹配所有的bindingKey,匹配成功则交换机将收到的消息传递到每一个匹配的队列中
topicExchange配置:
@Configuration
public class TopicConfigure {
// ===============以下是验证topic Exchange的队列==========
// 模糊匹配
@Bean
public Queue topicMessageA() {
return new Queue("topic.messageA");
}
@Bean
public Queue topicMessageB() {
return new Queue("topic.messageB");
}
@Bean
public TopicExchange exchangeTopic() {
return new TopicExchange("topicExchange");
}
//这里设置的匹配规则为topic.messageA
@Bean
public Binding bindingExchangeMessageA(TopicExchange exchange) {
return BindingBuilder.bind(topicMessageA()).to(exchange).with("topic.messageA");
}
//这里的匹配规则为topic.[任意单词]
@Bean
public Binding bindingExchangeMessageB(TopicExchange exchange) {
return BindingBuilder.bind(topicMessageB()).to(exchange).with("topic.#");
}
生产者:
@Component
public class TopicSender {
@Autowired
public RabbitTemplate rabbitTemplate;
public void sendTopicA() {
String message = "能被所有topic队列接收";
System.out.println("TopicSender sender : " + message);
//这里指定交换机后需要输入对应的routingKey
rabbitTemplate.convertAndSend("topicExchange", "topic.messageA", message);
}
public void sendTopicB() {
String message = "只能被一个topic队列接收";
System.out.println("TopicSender sender : " + message);
rabbitTemplate.convertAndSend("topicExchange", "topic.messageB", message);
}
}
消费者:
@Component
public class TopicConsumer {
@RabbitHandler
@RabbitListener(queues = "topic.messageA")
public void receiveTopicA(String meString) {
System.out.println("receiveTopicA receive : " + meString);
}
@RabbitListener(queues = "topic.messageB")
public void receiveTopicB(String meString) {
System.out.println("receiveTopicB receive : " + meString);
}
}
运行结果:
可以看到,当routingkey是topic.messageA时,因为既满足,receiveTopicA监听的队列的bindingKey由满足receiveTopicB监听的队列的bindingKey,所以都能被接收。
而当routingKey是topic.messageB时,只能满足receiveTopicB监听的队列的bindingKey所以只能被receiveTopicB接受。
(4)headersExchange
他不再依赖于bindingKey的匹配规则而是规定一个key-value的键值对来路由消息至队列。
headersExchange配置:
@Configuration
public class HeaderConfigure {
@Bean
public Queue headerQueue() {
return new Queue("header");
}
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange("headersExchange");
}
//这里绑定了键值对key=color value=red
//当然可以绑定多组键值对
@Bean
HeadersExchangeMapBindingCreator headersBinding(HeadersExchange headersExchange) {
Map<String, Object> map = new HashMap<>();
map.put("color", "red");
return BindingBuilder.bind(headerQueue()).to(headersExchange).whereAll(map);
// return
// BindingBuilder.bind(headerQueue()).to(headersExchange).where("color").matches("red");
}
}
生产者:
@Component
public class HeadersSender {
@Autowired
private RabbitTemplate rabbitTemplate;
//通过MessageProperties来设定消息头部的键值对
//之后通过SimpleMessageConverter()进行转化
//如果传递的为java对象的话,需要使用 Jackson2JsonMessageConverter()
public void sendHeaders() {
String str = "Heaven's Feel";
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("color", "red");
MessageConverter messageConverter = new SimpleMessageConverter();
Message message = messageConverter.toMessage(str, messageProperties);
rabbitTemplate.convertAndSend("headersExchange", "", message);
System.out.println("可接收到的消息发送完毕");
}
public void sendWrongHeaders() {
String str = "Stay Night";
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("color", "gold");
MessageConverter messageConverter = new SimpleMessageConverter();
Message message = messageConverter.toMessage(str, messageProperties);
rabbitTemplate.convertAndSend("headersExchange", "", message);
System.out.println("不可接收到的消息发送完毕");
}
}
消费者:
@Component
public class HeaderConsumer {
@RabbitListener(queues = "header")
@RabbitHandler
public void receiveHeader(Message message) {
String body = new String(message.getBody());
System.out.println("receiveHeader: " + body);
}
}
运行结果:
四、消息接受ACK研究
rabbitmq支持接受到消息后返回ACK信息
ACK:消息确认回执
yml配置:
spring:
application:
name: rabbitMq
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
#发送确认
publisher-confirms: true
#发送失败退回
publisher-returns: true
#开启ack
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
virtual-host: /
*由于增加回执的方法对于rabbitTemplet没什么区别以最简单的directExchange的生产者为例子:
生产者:
@Component
public class HelloSender implements ReturnCallback {//继承returnCallback来接受回执
@Autowired
public RabbitTemplate rabbitTemplate;
/**
* 发送给hello队列,基础应用
*/
public void sendHello() {
String message = "hello world " + new Date();
System.out.println("HelloSender sends : " + message);
rabbitTemplate.setReturnCallback(this);
//构建消息接受通知的函数
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
System.out.println("HelloSender 消息发送失败" + cause + correlationData.toString());
} else {
System.out.println("HelloSender 消息发送成功");
}
});
rabbitTemplate.convertAndSend("hello", message);
}
/**
* 发送对象给hello对象
*/
public void sendUser() {
User user = new User();
user.setName("eccom");
user.setDeString("test");
user.setAge("10");
System.out.println("HelloSender sends : " + user.getName());
rabbitTemplate.convertAndSend("user", user);
}
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
// TODO Auto-generated method stub
System.out.println("sender return success" + message.toString() + "===" + i + "===" + "s"
+ "===" + s1 + "===" + s2);
}
}
消费者:
@RabbitListener(queues = "hello")
@RabbitHandler
public void receiveHello(String hello, Channel channel, Message message) {
System.out.println("HelloConsumer receive: " + hello);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("receiver success");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
// ack返回false,消息回到队列
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
// 拒绝消息
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
System.out.println("receiver fail");
}
}
消息确认 因为在属性配置文件里面开启了ACK确认 所以如果代码没有执行ACK确认 你在RabbitMQ的后台会看到消息会一直留在队列里面未消费掉 只要程序一启动开始接受该队列消息的时候 又会收到
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
如果把上端代码注释,那么消息就算接受到了,但是没有确认,服务器会认为这条消息没有被消费,下次运行仍然会发送。
正常运行结果:
注释后结果:
发送结果中,虽然成功收到了这条消息,但是没有发送ack回执
而管理界面中呈现这条消息仍然在队列中:
之后关闭程序,并重新打开,再没有执行消息发送的情况下,我们的receive仍然会接受到这条消息。
五、管理界面介绍
(1)用户管理
登录默认的管理员帐号guest后,可以通过admin标签进行用户的管理
用户角色:
- NONE:不能访问 management plugin
- management:用户可以通过AMQP做的任何事外加
- 列出自己可以通过AMQP登入的virtual hosts
- 查看自己的virtual hosts中的queues, exchanges 和 bindings
- 查看和关闭自己的channels 和 connections
- 查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。
- policymaker :management可以做的任何事外加
- 查看、创建和删除自己的virtual hosts所属的policies和parameters
- monitoring:management可以做的任何事外加
- 列出所有virtual hosts,包括他们不能登录的virtual hosts
- 查看其他用户的connections和channels
- 查看节点级别的数据如clustering和memory使用情况
- 查看真正的关于所有virtual hosts的全局的统计信息
- administrator:policymaker和monitoring可以做的任何事外加
- 创建和删除virtual hosts
- 查看、创建和删除users
- 查看创建和删除permissions
- 关闭其他用户的connections
(2)虚拟主机
创建虚拟主机,可以实现在一个程序中将两个rabbitMQ的消息隔离开。
绑定用户与虚拟主机可以有两种方式:
(1) 用户方式
点击新建的用户,进入用户配置界面,配置其可以访问的虚拟主机:
(2)虚拟主机方式
同样的,在新建虚拟主机后,也可通过同样的方式给他配置用户:
选择/test虚拟主机
六、代码中配置多个rabbitMq
由于rabbitMq的用户权限控制的粒度是虚拟主机粒度的,所以为了隔离用户对队列的访问,需要通过不同的虚拟主机来加以限制。
例如:采购订单模型中,采购者只能操作采购者的消息队列,比如发布下单消息,无权操作厂商的消息队列,比如接单消息,在同一个应用程序中,如果不划分不同的虚拟主机,那么采购者在权限上也能操作接单的消息对列。
程序配置
在第五章中建立新的用户、虚拟主机,并绑定用户到虚拟主机后,可以如下图修改yml来为程序绑定多个虚拟主机:
spring:
application:
name: rabbitMq
rabbitmq:
vhost1:
host: 127.0.0.1
port: 5672
username: guest
password: guest
#发送确认
publisher-confirms: true
#发送失败退回
publisher-returns: true
#开启ack
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
virtual-host: /
vhost2:
host: 127.0.0.1
port: 5672
username: customer
password: customer
#发送确认
publisher-confirms: true
#发送失败退回
publisher-returns: true
#开启ack
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
virtual-host: /test
由于有了多个连接配置,就不能再使用springboot自带的注入方式,需要手动配置对rabbitMQ的连接:
@Bean(name = "vhost1")
@Primary
public ConnectionFactory vHost1ConnectionFactory(
@Value("${spring.rabbitmq.vhost1.host}") String host,
@Value("${spring.rabbitmq.vhost1.port}") int port,
@Value("${spring.rabbitmq.vhost1.username}") String username,
@Value("${spring.rabbitmq.vhost1.password}") String password,
@Value("${spring.rabbitmq.vhost1.publisher-confirms}") Boolean publisherConfirms,
@Value("${spring.rabbitmq.vhost1.virtual-host}") String virtualHost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(publisherConfirms);
return connectionFactory;
}
@Bean(name = "vhost2")
public ConnectionFactory vHost2ConnectionFactory(
@Value("${spring.rabbitmq.vhost2.host}") String host,
@Value("${spring.rabbitmq.vhost2.port}") int port,
@Value("${spring.rabbitmq.vhost2.username}") String username,
@Value("${spring.rabbitmq.vhost2.password}") String password,
@Value("${spring.rabbitmq.vhost2.publisher-confirms}") Boolean publisherConfirms,
@Value("${spring.rabbitmq.vhost2.virtual-host}") String virtualHost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(publisherConfirms);
return connectionFactory;
}
这里由于陪着了多个同类型的bean需要加上@Primary注解来帮助springboot进行注入。
同理,对之前使用的rabbitmq工具类,listener容器也需要重新手动注入:
// 连接工具
@Bean(name = "vhost1template")
@Primary
public RabbitTemplate v1HostrabbitTemplate(
@Qualifier("vhost1") ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
return template;
}
@Bean(name = "vhost2template")
public RabbitTemplate v2HostrabbitTemplate(
@Qualifier("vhost2") ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
return template;
}
// 容器工厂
@Bean(name = "vhost1Container")
@Primary
public SimpleRabbitListenerContainerFactory vhost1Container(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("vhost1") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//之前为自动注入,现在需要手动调整
factory.setPrefetchCount(1);//该配置项为,每次只分配一条消息当且仅当受到ack时,才发送下一条
return factory;
}
// 容器工厂
@Bean(name = "vhost2Container")
public SimpleRabbitListenerContainerFactory vhost2Container(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("vhost2") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setPrefetchCount(1);
return factory;
}
如上,对于rabbitMQ的多虚拟主机配置就完成了。
之后在运行的过程中出现[队列未注册的问题],其导致的原因为:在不进行手动配置的情况下,springboot默认只绑定队列到第一个虚拟主机(即有@Primary标示)。
解决方案经过测试有两个:
通过配置rabbitAdmin来解决:
@Bean(name="vhost1RabbitAdmin")
public RabbitAdmin vhost1RabbitAdmin(@Qualifier("vhost1") ConnectionFactory connectionFactory ){
return new RabbitAdmin(connectionFactory);
}
但这样会有一个问题,他会将所有queue和exchange的bean对象全部注入两个虚拟主机中,感觉不太受控制。所以我使用的是通过channel来手动注册queue(exchange的方法也大同小异,由于我测试使用的是默认的directExchange所以没有做相关实现)到虚拟主机:
@Bean
public Queue helloQuery(@Qualifier("vhost2") ConnectionFactory connectionFactory) {
Queue queue = new Queue("hello");
try {
connectionFactory.createConnection().createChannel(false).queueDeclare("hello", true,
true, false, null);
//参数分别为队列名,是否持久化,是否排外,是否自动删除,在什么情况下删除
} catch (AmqpException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return queue;
}
这样就手动的把queue(“hello”)绑定到了vhost2
生产者与消费者的改动比较简单
//消费者增加(添加对容器的配置)
@RabbitListener(queues = "hello", containerFactory = "vhost1Container")
//生产者增加(指定rabbitTemplate的名字)
@Resource(name = "vhost1template")
public RabbitTemplate v1rabbitTemplate;
@Resource(name = "vhost2template")
public RabbitTemplate v2rabbitTemplate;
这里为了测试体现效果,我将hello队列分别绑定到了两个虚拟主机并用两个消费者分别监听两个虚拟主机上的hello队列,理想结果是,两个消费者只能获得各自监听的虚拟主机上的队列hello
当都监听vhost1时,发送两条消息的结果:
当分别监听vhost1,vhost2时,分别发送两条的结果:
总结
rabbitMq的使用场景主要用于异步通讯与程序的解耦,目前也只是罗列尝试了一些基本的应用,在spring族的一些开源模块中,对rabbit的应用可能已经被集成在了内部,只需要对yml文件进行配置就可以进行使用了。在这种场景下配置多个rabbit或者多个vhost可能会成为配置上的难点。
ps:对于rabbitmq集群的配置属于安装范畴可以参考:https://blog.csdn.net/jinyidong/article/details/80003362