Rabbitmq基础知识与故障解决,死信队列、顺序消费、幂等性
rabbitmq基础架构图
消息队列执行过程
1.客户端连接到消息队列服务器,打开一个Channel。
2.客户端声明一个Exchange,并设置相关属性。
3.客户端声明一个Queue,并设置相关属性。
4.客户端使用Routing key,在Exchange和Queue之间建立好绑定关系。
5.客户端投递消息到Exchange。
Exchange接收到消息后,就根据消息的key和已经设置的Binding,进行消息路由,将消息投递到一个或多个队列里。有三种类型的Exchanges:direct,fanout,topic,每个实现了不同的路由算法(routing algorithm):
Direct exchange:完全根据key进行投递的叫做Direct交换机。如果Routing key匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。例如,绑定时设置了Routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。
Fanout exchange:不需要key的叫做Fanout交换机。它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
Topic exchange:对key进行模式匹配后进行投递的叫做Topic交换机。比如符号”#”匹配一个或多个词,符号””匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.”只匹配”abc.def”。
消息接收确认
消息消费者如何通知 Rabbit 消息消费成功?
-
消息通过 ACK 确认是否被正确接收,每个 Message 都要被确认(acknowledged),可以手动去 ACK 或自动 ACK
-
自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息
-
如果消息已经被处理,但后续代码抛出异常,使用 Spring 进行管理的话消费端业务逻辑会进行回滚,这也同样造成了实际意义的消息丢失
-
如果手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者
-
如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限
-
ACK 机制还可以起到限流作用,比如在接收到某条消息时休眠几秒钟
-
消息确认模式有:
-
-
AcknowledgeMode.NONE:自动确认
-
AcknowledgeMode.AUTO:根据情况确认
-
AcknowledgeMode.MANUAL:手动确认
-
什么是死信队列
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列;
RabbitMQ的死信队列
对rabbitmq来说,产生死信的来源大致有如下几种:
1.消息被拒绝(basic.reject或basic.nack)并且requeue=false.
2.消息TTL过期
3.队列达到最大长度(队列满了,无法再添加数据到mq中)
死信的处理方式
死信的产生既然不可避免,那么就需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种:
1.丢弃,如果不是很重要,可以选择丢弃
2.记录死信入库,然后做后续的业务分析或处理
3.通过死信队列,由负责监听死信的应用程序进行处理
延时队列
想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就万事大吉了,因为里面的消息都是希望被立即处理的消息。
从下图可以大致看出消息的流向:
生产者生产一条延时消息,根据需要延时时间的不同,利用不同的routingkey将消息路由到不同的延时队列,每个队列都设置了不同的TTL属性,并绑定在同一个死信交换机中,消息过期后,根据routingkey的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理即可。
顺序消费
1、发送的顺序消息,必须保证在投递到同一个队列,且这个消费者只能有一个(独占模式)
2、然后同意提交(可以合并一个大消息,或拆分多个消息,最好是拆分),并且所有消息的会话ID一致
3、添加消息属性:顺序表及的序号、本地顺序消息的size属性,进行落库操作
4、并行进行发送给自身的延迟消息(带上关键属性:会话ID、SIZE)进行后续处理消费
5、当收到延迟消息后,根据会话ID、SIZE抽取数据库数据进行处理即可
6、定时轮询补偿机制,对于异常情况
可靠性投递
消费端-幂等性保障
什么情况下会出现重复消费?
当消费者消费完消息时,在给生产端返回ack时由于网络中断,导致生产端未收到确认信息,该条消息会重新发送并被消费者消费,但实际上该消费者已成功消费了该条消息,这就是重复消费问题。
如何避免消息的重复消费问题?
消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即使我们收到了多条一样的消息
业界主流的幂等性操作:
-
唯一ID + 指纹码机制,利用数据库主键去重
事务场景
崩溃恢复
1.大量消息在mq里积压了几个小时了还没解决
场景:几千万条数据在MQ里积压了七八个小时,从下午4点多,积压到了晚上很晚,10点多,11点多。线上故障了,这个时候要不然就是修复consumer的问题,让他恢复消费速度,然后傻傻的等待几个小时消费完毕。这个肯定不行。一个消费者一秒是1000条,一秒3个消费者是3000条,一分钟是18万条,1000多万条。
所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概1小时的时间才能恢复过来。
解决方案:
这种时候只能操作临时扩容,以更快的速度去消费数据了。具体操作步骤和思路如下:
①先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉。
②临时建立好原先10倍或者20倍的queue数量(新建一个topic,partition是原来的10倍)。
③然后写一个临时分发消息的consumer程序,这个程序部署上去消费积压的消息,消费之后不做耗时处理,直接均匀轮询写入临时建好分10数量的queue里面。
④紧接着征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的消息。
⑤这种做法相当于临时将queue资源和consumer资源扩大10倍,以正常速度的10倍来消费消息。
⑥等快速消费完了之后,恢复原来的部署架构,重新用原来的consumer机器来消费消息。
2.消息设置了过期时间,过期就丢了怎么办
假设你用的是rabbitmq,rabbitmq是可以设置过期时间的,就是TTL,如果消息在queue中积压超过一定的时间就会被rabbitmq给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在mq里,而是大量的数据会直接搞丢。
解决方案:
这种情况下,实际上没有什么消息挤压,而是丢了大量的消息。所以第一种增加consumer肯定不适用。
这种情况可以采取 “批量重导” 的方案来进行解决。
在流量低峰期(比如夜深人静时),写一个程序,手动去查询丢失的那部分数据,然后将消息重新发送到mq里面,把丢失的数据重新补回来。
3.积压消息长时间没有处理,mq放不下了怎么办
如果走的方式是消息积压在mq里,那么如果你很长时间都没处理掉,此时导致mq都快写满了,咋办?这个还有别的办法吗?
解决方案:
这个就没有办法了,肯定是第一方案执行太慢,这种时候只好采用 “丢弃+批量重导” 的方式来解决了。
首先,临时写个程序,连接到mq里面消费数据,收到消息之后直接将其丢弃,快速消费掉积压的消息,降低MQ的压力,然后走第二种方案,在晚上夜深人静时去手动查询重导丢失的这部分数据。