RabbitMQ

https://my.oschina.net/AnnaWu/blog/2966181

1、RabbitMQ简介

RabbitMP简介:

  • RabbitMQ是开源的消息代理队列服务器,用来通过普通协议在完全不同的应用之间 共享数据,RabbitMQ底层是用了Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。消息以管道的方式进行传递。

  • RabbitMQ不仅仅可以使用java客户端进行编写,且可以使用其他的语言(python,php等…),它提供了丰富的API

使用场景:

在我们秒杀抢购商品的时候,系统会提醒我们稍等排队中,而不是像几年前一样页面卡死或报错给用户。

像这种排队结算就用到了消息队列机制,放入通道里面一个一个结算处理,而不是某个时间断突然涌入大批量的查询新增把数据库给搞宕机,所以RabbitMQ本质上起到的作用就是削峰填谷,为业务保驾护航。

RabbitMQ的优点:

  • 开源,性能优秀,稳定性保障

  • 与SpringAMQP完美的整合,API丰富 (Spring基于RabbitMQ 提供了一套框架,叫做AMQP框架)这套框架不仅提呈了原生的RabbitMQ,而且还提供了丰富可扩张的API帮助开发人员更好的去应用

  • 集群模式丰富,表达式配置,HA模式,镜像队列模型
    说明:(保证数据不丢失的提前做到高可靠性,可用性)普遍使用的镜像队列模式

  • AMQP全称:Advanced Message Queuing Protocl AMQP翻译过来:高级消息队列协议

为什么选择RabbitMQ:

现在的市面上有很多MQ可以选择,比如ActiveMQ、ZeroMQ、Appche Qpid,那问题来了为什么要选择RabbitMQ?

  • 除了Qpid,RabbitMQ是唯一一个实现了AMQP标准的消息服务器;
  • 可靠性,RabbitMQ的持久化支持,保证了消息的稳定性;
  • 高并发,RabbitMQ使用了Erlang开发语言,Erlang是为电话交换机开发的语言,天生自带高并发光环,和高可用特性;
  • 集群部署简单,正是因为Erlang使得RabbitMQ集群部署变的超级简单;
  • 社区活跃度高,根据网上资料来看,RabbitMQ也是首选;

2、AMQP模型图

RabbitMQ

  • Publisher(生产者)把消息投递到server上,然后经过Virtual host再到Exchange就可以了。生产者不需要关心我的消息投递到哪个队列。

  • Consumer(消费者) 只需要和Messager Queue 进行 监听绑定就可以了,从而实现 队列级别的 结偶.

  • ExchangeMessage Queue 之间有一个绑定的关系,然后通过 路由key 关联,也就是说消息发到Exchange上,然后通过某种路由规则,把消息路由到某一个队列上,这其实是RabbitMQ的核心生产者消费者的关系是通过 路由 进行关联的

3、RabbitMQ架构图

RabbitMQAMQP核心概念

  • Server:又称Broker接受客户端的连接实现AMQP实体服务.

  • Connection:连接,应用程序与Broker的网络连接.

  • Channel:网络 信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务.

  • Message:消息,服务器应用程序之间传送的数据,由PropertiesBody组成.Properties可以对消息进行修饰,比如消息的优先级以及延迟等高级特性;Body则就是消息体内容

  • Virtual Host:虚拟地址用于进行逻辑隔离,最上层的消息路由,Rabbit可以创建多个vhost(多个vhost是隔离的,多个vhost无法通讯,所以不用担心命名冲突),一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有 相同名称的 Exchange或Queue.

  • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列

  • Binding:ExchangeQueue之间的虚拟连接,Binding中可以包含routing key

  • Routing Key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息

  • Queue:也称为Message Queue消息队列保存消息并将它们转发给消费者

RabbitMQ消息流转图:

举例:生产者发送了一个消息给Message,指定了 exchange name叫做e1 ,正好Exchange 也叫做e1,指定一个routing key :123

消息规则路由过来,Message Queue 123 和routing key 123就匹配上了
RabbitMQ

4、RabbitMq的整理 exchange、route、queue关系

转自
https://blog.csdn.net/samxx8/article/details/47417133

4.1 声明MessageQueue

在Rabbit MQ中,无论是生产者发送消息还是消费者接受消息,都首先需要声明一个MessageQueue。这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确:

a) 消费者是无法订阅或者获取不存在的MessageQueue中信息

b) 消息被Exchange接受以后,如果没有匹配的Queue则会被丢弃

在明白了上述两点以后,就容易理解如果是消费者去声明Queue,就有可能会出现在声明Queue之前,生产者已发送的消息被丢弃的隐患。如果应用能够通过消息重发的机制允许消息丢失,则使用此方案没有任何问题。但是如果不能接受该方案,这就需要无论是生产者还是消费者,在发送或者接受消息前,都需要去尝试建立消息队列。这里有一点需要明确,如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的。

如果一个消费者在一个信道中正在监听某一个队列的消息,Rabbit MQ是不允许该消费者在同一个channel声明其他队列的。Rabbit MQ中,可以通过queue.declare命令声明一个队列,可以设置该队列以下属性:

a) Exclusive排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。

b) Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列

c) Durable:持久化,这个会在后面作为专门一个章节讨论。

d) 其他选项,例如如果用户仅仅想查询某一个队列是否已存在,如果不存在,不想建立该队列,仍然可以调用queue.declare,只不过需要将参数passive设为true,传给queue.declare,如果该队列已存在,则会返回true;如果不存在,则会返回Error,但是不会创建新的队列。

4.2 生产者发送消息

在AMQP模型中,Exchange是接受生产者消息将消息路由到消息队列的关键组件。ExchangeTypeBinding决定了消息的路由规则。所以生产者想要发送消息,首先必须要声明一个Exchange和该Exchange对应的Binding。可以通过 ExchangeDeclare和BindingDeclare完成。在Rabbit MQ中,声明一个Exchange需要三个参数:ExchangeName,ExchangeType和Durable。ExchangeName是该Exchange的名字,该属性在创建Binding和生产者通过publish推送消息时需要指定。ExchangeType指Exchange的类型,在RabbitMQ中,有三种类型的Exchange:direct ,fanout和topic,不同的Exchange会表现出不同路由行为。Durable是该Exchange的持久化属性,这个会在消息持久化章节讨论。声明一个Binding需要提供一个QueueName,ExchangeName和BindingKey。下面我们就分析一下不同的ExchangeType表现出的不同路由规则。

生产者在发送消息时,都需要指定一个RoutingKey和Exchange,Exchange在接到该RoutingKey以后,会判断该ExchangeType:

a) 如果是Direct类型,则会将消息中RoutingKey与该Exchange关联的所有Binding中BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。

RabbitMQ

b) 如果是 Fanout 类型,则会将消息发送给所有与该 Exchange 定义过 Binding 的所有 Queues 中去,其实是一种广播行为

RabbitMQ

c)如果是Topic类型,则会按照正则表达式对RoutingKey与BindingKey进行匹配,如果匹配成功,则发送到对应的Queue中。

RabbitMQ

4.3 消费者订阅消息

在RabbitMQ中消费者有2种方式 获取队列中的消息:

a) 一种是通过basic.consume命令,订阅某一个队列中的消息,channel会自动在处理完上一条消息之后,接收下一条消息。(同一个channel消息处理是串行的)。除非关闭channel或者取消订阅,否则客户端将会一直接收队列的消息。

b) 另外一种方式是通过basic.get命令主动获取队列中的消息,但是绝对不可以通过循环调用basic.get来代替basic.consume,这是因为basic.get RabbitMQ在实际执行的时候,是首先consume某一个队列,然后检索第一条消息,然后再取消订阅。如果是高吞吐率的消费者,最好还是建议使用basic.consume。

如果有多个消费者同时订阅同一个队列的话,RabbitMQ是采用循环的方式分发消息的,每一条消息只能被一个订阅者接收。例如,有队列Queue,其中ClientA和ClientB都Consume了该队列,MessageA到达队列后,被分派到ClientA,ClientA回复服务器收到响应,服务器删除MessageA;再有一条消息MessageB抵达队列,服务器根据“循环推送”原则,将消息会发给ClientB,然后收到ClientB的确认后,删除MessageB;等到再下一条消息时,服务器会再将消息发送给ClientA。

这里我们可以看出,消费者再接到消息以后,都需要给服务器发送一条确认命令,这个即可以在handleDelivery里显示的调用basic.ack实现,也可以在Consume某个队列的时候,设置autoACK属性为true实现。这个ACK仅仅是通知服务器可以安全的删除该消息,而不是通知生产者,与RPC不同。 如果消费者在接到消息以后还没来得及返回ACK就断开了连接,消息服务器会重传该消息给下一个订阅者,如果没有订阅者就会存储该消息

既然RabbitMQ提供了ACK某一个消息的命令,当然也提供了Reject某一个消息的命令。当客户端发生错误,调用basic.reject命令拒绝某一个消息时,可以设置一个requeue的属性,如果为true,则消息服务器会重传该消息给下一个订阅者;如果为false,则会直接删除该消息。当然,也可以通过ack,让消息服务器直接删除该消息并且不会重传。

4.4 持久化:

Rabbit MQ默认是不持久队列、Exchange、Binding以及队列中的消息的,这意味着一旦消息服务器重启,所有已声明的队列,Exchange,Binding以及队列中的消息都会丢失。通过设置Exchange和MessageQueue的durable属性为true,可以使得队列和Exchange持久化,但是这还不能使得队列中的消息持久化,这需要生产者在发送消息的时候将delivery mode设置为2只有这3个全部设置完成后,才能保证服务器重启不会对现有的队列造成影响。这里需要注意的是,只有durable为true的Exchange和durable为ture的Queues才能绑定,否则在绑定时,RabbitMQ都会抛错的。持久化会对RabbitMQ的性能造成比较大的影响,可能会下降10倍不止。


消息持久化:

Rabbit队列和交换器有一个不可告人的秘密,就是默认情况下重启 服务器 会导致 消息丢失,那么怎么保证Rabbit在重启的时候不丢失呢?答案就是消息持久化

当你把消息发送到Rabbit服务器的时候,你需要选择你是否要进行持久化,但这并不能保证Rabbit能从崩溃中恢复,想要Rabbit消息能恢复必须满足3个条件

1、投递消息的时候durable设置为true,消息持久化,代码:channel.queueDeclare(x, true, false, false, null),参数2设置为true持久化;

2、设置投递模式deliveryMode设置为2(持久),代码:channel.basicPublish(x, x, MessageProperties.PERSISTENT_TEXT_PLAIN,x),参数3设置为存储纯文本到磁盘

3、消息已经到达 持久化 交换器上

4、消息已经到达 持久化的 队列

持久化工作原理:

Rabbit会将你的持久化消息 写入磁盘上的 持久化 日志文件,等消息被消费之后,Rabbit会把这条消息标识为等待垃圾回收

持久化的缺点:

消息持久化的优点显而易见,但缺点也很明显,那就是性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量,尽管使用SSD硬盘可以使事情得到缓解,但他仍然吸干了Rabbit的性能,当消息成千上万条要写入磁盘的时候,性能是很低的。

所以使用者要根据自己的情况,选择适合自己的方式。

4.5 事务

对事务的支持AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,因为consume命令 本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit() 提交该事务,即可以保证,如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会被服务器接收。当然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。

4.6 Confirm机制

使用事务固然可以保证只有提交的事务才会被服务器执行。但是这样同时也将客户端与消息服务器同步起来,这背离了消息队列解耦的本质。Rabbit MQ提供了一个更加轻量级的机制来保证生产者可以感知服务器消息是否已被路由到正确的队列中——Confirm。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,然后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后才会返回ConfirmConfirm机制最大优点在于 异步,生产者在发送消息以后,即可继续执行其他任务。而服务器返回Confirm后,会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。如果消息服务器发生异常,导致该消息丢失,会返回给生产者一个nack表示消息已经丢失,这样生产者就可以通过重发消息保证消息不丢失。Confirm机制在性能上要比事务优越很多。但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,生产者其实本身也不知道该消息是否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息系统需要支持去重