8.Spring Boot RabbitMQ
[作者信息] github: https://github.com/MiniPa email: [email protected]
[项目代码:https://github.com/MiniPa/spring_boot_demos]
===============================================================================
[参考文章] 安装这种事情稍微练练太简单了,请自行搞定,下面给出推荐链接
RabbitMQ download Erlang download
https://www.zouyesheng.com/rabbitmq.html
https://github.com/401Studio/WeekLearn/issues/2
RabbitMQ + Erlang Linux安装指导 + other1
[RabbitMQ]
MQ: Message Queue 消息队列,实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用 RocketMQ,RabbitMQ,ActiveMQ 标准用法:生产者消费者 其他用法:分布式事物支持,RPC调用
RabbitMQ 官网 是实现AMQP(高级消息队列协议)的消息中间件的一种 服务器端用Erlang语言编写,支持多种客户端 易用性、扩展性、高可用性 AMQP:是应用层协议的一个开放标准,为面向消息的中间件设计 特征: 面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全
[补充内容] 队列服务:发消息者,队列,收消息者 Rabbit: 做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列
(此图百度知识上有详细介绍)
[Rabbit概念/角色] 1.虚拟主机:一个虚拟主机持有一组交换机、队列和绑定, 是用户权限控制粒度所在. 禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机 RabbitMQ服务器都有一个默认的虚拟主机"/" 2.交换机: Exchange 只转发不存储,若未绑定Queue则丢弃 3.路由键: 消息到交换机,交换机根据路由键决定转到那个队列 4.绑定: 交换机和队列多对多的关系 5.Producer && Consumer 生产者和消费者 6.队列: Queue 7.消息: Message
[Rabbit重要机制] 1.持久化 Exchange/Queue/Message等落地 2.调度策略 Exchange如何把消息分给Queue 3.分配策略 Queue如何分配消息给Consumner 4.状态反馈
|
[1.持久化]
1.默认情况下,RabbitMQ突然崩溃,消息/队列/交换器都不具有持久化的性质。 2.交换器和队列在申明时候可通过一个durable参数实现 (exchange='first', type='fanout', durable=True) 中断重启 Exchange 和 Queue都会恢复,但Queue中Message不会 3.消息通过properties属性,做成持久化 (exchange='first', routing_key='', body='Hello World!', properties=pika.BasicProperties( delivery_mode = 2, )) 消息的持久化并不是一个很强的约束, 涉及数据落地的时机, 及系统层面的 fsync 等问题, 不要认为消息完全不会丢. 需要配置其它的一些机制, 比如后面会谈到的 状态反馈 中的 confirm mode.来共同提高消息持久化. |
[2.调度策略:交换机Exchange]
接收消息并且转发到绑定的队列,交换机不存储消息 启用ack模式后,交换机找不到队列会返回错误
1.Direct: 先匹配, 再投送. 绑定时设定的 routing_key 和消息的routing_key 匹配时,投送. default 2.Topic: 按规则转发消息(最灵活) 根据通配符 3.Headers: 设置header attribute参数类型的交换机 4.Fanout: 转发消息到所有绑定队列 [Direct]
[Topic] 队列和交换机的绑定会定义一种路由模式,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息 e.g.shanghai..b.*: 第一单词shanghai,第四单词b, shanghai.a.# 以shanghai.a.开头的都匹配 #代表一个或多个单词, *代表一个单词 3.发送代码
[Headers] 在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列
[Fanout] |
[Spring Boot RabbitMQ 1对1]
1.pom AMQP spring-boot-starter-amqp 2.application.properties
3.队列配置
4.发送者
5.接收者
6.测试
(原理就是这样拉注意queue名字要保持一致否则接收不到) |
[Spring Boot RabbitMQ]
(代码就不贴了,可以在github上看) 3.Topic Exchange: 根据row_key绑定不同的队列 4.Fanout Exchange: 广播啦 |
[3.分配策略]
1.关闭 ack 的情况下, Queue 一旦有消费者请求, 那么当前队列中的消息它都会一次性吐很多出去 打开 ack, Queue 一次给一条消息,默认ack是打开的 channel.basic_qos(prefetch_count=2) 可设置一次取多少Message |
[4.状态反馈]
1.目的:确认行为的结果. 如消息是否送达到Queue,消息是否正常被Comsumer消费了 2.方法:AMQP中要保证消息的业务可靠性只能使用事务,不过在RabbitMQ中有相应的简单扩展机制达到目的
这部分内容比较多,暂停研究,后续跟进 // TODO
|