RabbiMQ是什么?看完你就知道了

AMQP是什么?

一、简介

是一个性能也许不是很高,但是是一个可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

1、作用与特性

  • 系统解耦

  • 冗余(存储数据)

  • 可扩展

    • 集群部署
  • 可用性

    • 镜像队列
  • 削峰

  • 可恢复性

  • 缓冲

  • 异步通信

  • 可靠性

    • 生产者消息确认
    • 消费者消息确认
    • 相关组件支持持久化
  • 灵活的路由

  • 支持多种协议

  • 支持多种语言客户端

  • 支持友好管理界面

  • 支持插件机制

2、相关组件介绍

RabbiMQ是什么?看完你就知道了

Rabbit是AMQP协议的一个实现,所以在熟悉了AMQP协议之后可以更好地使用Rabbit

  • 生产者与消费者
  • 队列:用于存储消息
  • 交换机:根据绑定建与路由键将消息投递到队列
    • 主题 topic
      • 通过“.”分割字符串
      • “*”匹配一个单词
      • “#”匹配一个或多个单词
      • 留意这里跟正则表达式有些许区别
    • 直接 direct
      • 路由键完全匹配绑定键
    • 扇出 fanout
    • 头 header
  • 绑定:将交换机跟队列,或将交换机跟交换机通过某种规则绑定起来
    • 绑定键:投递的某种规则,根据交换机类型所觉得的匹配规则
    • 路由键:投递消息时代表这条消息的投递规则,由交换机决定是否击中绑定键而进行投递
  • 连接:TCP协议建立的一条可靠连接
  • 信道:建立在链接上面的虚拟链接

3、运转流程

以下流程指的是比较通用的业务场景

  • 生产者侧
    • 链接消息队列并建立信道
    • 声明一个交换机
    • 声明一个队列
    • 绑定交换机与队列之间的关系
    • 指定路由键并发送消息
    • 关闭信道
    • 关闭连接
  • 消费者侧
    • 链接消息队列并建立信道
    • 声明一个交换机
    • 声明一个队列
    • 绑定交换机与队列之间的关系
    • 订阅某条队列并接收消息
    • 消息确认
    • 关闭信道
    • 关闭连接

二、开发要点

1、连接RabbitMQ

信道不能在线程间共享,可能会出现错误的通信帧交错,同时也会影响发送确认的问题

2、交互动作

2.1、交换机的描述exchangeDeclare

相关的属性

  • exchange:交换器名称
  • type:类型
  • durable:是否持久化
  • autoDeleted:是否自动删除,至少有一个队列或交换器与改交换器绑定,之后所有与这个交换器绑定的队列或交换器与此解绑。
  • inernal:是否内置。内置交换机只能通过交换路由到交换器这种方式使用
  • argument:其他一些结构化参数

类似的方法

  • exchangeDeclarePassive:描述的交换器必须存在,否则抛出异常
  • exchangeDeclareNoWait:不需要服务器返回OK,建议不用,防止创建失败

注意事项

  • 生产者跟消费者都可以声明一个交换器与队列,如果尝试声明一个已经存在的交换器或队列,如果参数完全匹配,那么不做任何事情。如果发现不匹配,出现冲突则响应一个异常。

2.2、队列的描述queueDeclared

相关属性

  • queue:队列的名称
  • durable:是否持久化
  • exclusive:设置是否排他,排他的意思是只对当前连接可见。
  • autoDeleted:是否删除。至少有一个消费者连接到这个队列,之后所有与这个队列的消费者都断开时,才会自动删除。因为生产者创建了这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列
  • arguments:队列参数

类似方法

  • queueDeclareNoWait:不需要服务器返回OK,建议不使用
  • queueDeclarePassive:描述的队列必须存在,否则响应异常

2.3、队列绑定queueBind

相关属性

  • queue:队列名称
  • exchange:交换机名称
  • routingKey:绑定键
  • argument:绑定参数

2.4、交换机绑定exchangeBind

指的是将交换机于交换机绑定到一起,使用方法跟队列绑定相似

2.5、发布消息basicPublish

相关参数

  • exchange:交换器名称
  • routingKey:路由键
  • props:基本属性集
  • mandatory:确认是否能投递到队列中,如果不是则返回
  • immediate:是否立即投递到消费者手里,如果不是则返回

2.6、推模式订阅消息basicConsume

相关属性

  • queue:队列名称
  • autoAck:设置是否自动确认
  • consumerTag:设置消费者标签
  • noLocal:设置为true表示不能将同一个链接中生产者发送的消息传送给这个链接中的消费者
  • exclusive:设置是否排他
  • argument:其他参数
  • callback:消费者回调函数,用来处理消息

注意事项:会收到Basic Qos的限制

2.7、拉模式订阅消息basicGet

建议不要使用while+basicGet会严重影响性能

2.8、消费端的确认与拒绝

  • 订阅队列时制定autoAck参数
    • true:自动确认
    • false:等待消费者确认
  • 拒绝一条:basicReject
  • 拒绝之前所有:basicNack
  • 请求重新发送还未确认的消息:basicRecover

三、高阶特性

1、mandatory

确认消息必须投递到队列中,如果无法匹配则返回生产者

2、immediate

  • 确认消息必须投递到消费者手中,该方法已过期
  • 使用死信队列加生存时间代替处理此问题

3、备份交换器AE

RabbiMQ是什么?看完你就知道了

未被路由到的消息可以投递到该交换机

配置方法:在描述交换机的时候添加“altermate-exchange”指定一个备份交换机。

特殊情况

  • 如果不存在备份交换机,那么消息会丢失
  • 如果备份交换机,没有绑定任何队列,那么消息会丢失
  • 如果备份交换机跟mandatory参数一起使用,那么mandatory参数无效

4、过期时间(TTL)

指的是消息超过这个存活时间将会被消息队列剔除

  • 通过消息参数设置“expiration”
  • 通过队列参数设置“x-message-ttl”

如果是通过队列的属性设置,会从队列头固定扫描剔除过期消息。如果通过消息属性设置,那么会在被消费的时候剔除

5、死信队列(DLX)

RabbiMQ是什么?看完你就知道了

当消息变成死信的时候,将会被投递到该交换器中

消息变成死信的特征

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

设置方法:在描述队列时,加入x-dead-letter-exchange参数指定一条死信队列

6、延迟队列

rabbit没有这种队列的默认实现,但是可以通过死信队列+TTL来完成模拟

7、优先级队列

优先级高的消息具备优先被消费的特权

设置方法:通过队列的x-max-priority参数设置

注意事项:如果消息者消费的速度太快则体现不了该能力

8、RPC方案支持

方案流程:

  • 调用方把参数放置进一个RPC队列,并制定回调队列
  • 被调用方订阅RPC队列,进行计算后把结果投递到调用方指定的回调队列
  • 调用方从调用队列里面获取响应内容

9、持久化

  • 交换机持久化
  • 队列持久化
  • 消息持久化

注意事项

  • 三个组件都需要设置持久化属性,那么持久化才能起到效果
  • 持久化会严重影响性能
  • 落盘不是实时的
  • 并不能保证100%的可靠性

10、生产者确认机制

  • 事务

    • 相关命令
      • channel.txSelect
      • channel.txCommit
      • channel.txRollback
    • 执行出错,捕捉异常,队列回滚
    • 非常消耗性能
  • 发送方确认

    • 设置方法:channel.confirmSelect

    • 确认方式:

      • 单条确认:channel.waitForConfirms

      • 批量确认:即等待到一定数量的消息之后再执行确认动作。批量重传会影响性能。

      • 异步确认(建议):通过ConfirmListener来异步监听,通过SortedSet来存储那些未被确认的消息id。编程会比较复杂

  • 注意这个机制跟mandatory存在区别,生产者确认机制是确保消息可以到达rabbitmq,而不保证能到达队列。切确认机制跟事务机制是互斥的,不能同时使用

11、消费端要点

  • 消息分发
    • 同队列轮询分发
    • basicQos控制channal最多接收的消息数,使用gobal控制整个信道的消息数
  • 消息顺序性保证
    • 在多种场景下均可能发生消息顺序不一致的场景
    • 可以使用序列id对消息进行标志,使用缓存等中间件进行重排序
  • 消息传输保障
    • 最多发一次(存在消息丢失的可能性,消息不会重复)
    • 最少发一次(存在消息重复的可能性)
      • 发送确认
      • 投递确认
      • 持久化处理
      • 接收处理
    • 恰好发一次

11、消费端要点

  • 消息分发
    • 同队列轮询分发
    • basicQos控制channal最多接收的消息数,使用gobal控制整个信道的消息数
  • 消息顺序性保证
    • 在多种场景下均可能发生消息顺序不一致的场景
    • 可以使用序列id对消息进行标志,使用缓存等中间件进行重排序
  • 消息传输保障
    • 最多发一次(存在消息丢失的可能性,消息不会重复)
    • 最少发一次(存在消息重复的可能性)
      • 发送确认
      • 投递确认
      • 持久化处理
      • 接收处理
    • 恰好发一次
      • 难以实现