RabbitMQ之Producer(二)
事务
-
事务机制步骤
- 设置
Channel
为事务模式:channel.txSelect
- 发送信息
- 如果出现异常则回滚
channel.txRollback
,如果成功则channel.txCommit
- 设置
-
如果事务提交成功,则消息一定达到了
RabbitMQ
.因为只有消息成功被RabbitMQ
接收,事务才能提交成功。 -
代码
@Test public void testTransaction() { ConnectionFactory factory = new ConnectionFactory(); String userName = "jannal"; String password = "jannal"; String virtualHost = "jannal-vhost"; String queueName = "jannal.direct.transaction.queue"; String exchange = "jannal.direct.transaction.exchange"; String routingKey = "SMS"; String bindingKey = "SMS"; String hostName = "jannal.mac.com"; int portNumber = 5672; factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(false); Connection conn = null; try { conn = factory.newConnection(); Channel channel = conn.createChannel(); boolean durable = false; boolean exclusive = false; boolean autoDelete = false; channel.queueDeclare(queueName, durable, exclusive, autoDelete, null); channel.exchangeDeclare(exchange, "direct", true); channel.queueBind(queueName, exchange, bindingKey); boolean mandatory = false; boolean immediate = false; String msg = "Hello, world "; try { //开启事务 channel.txSelect(); channel.basicPublish(exchange, routingKey, mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8")); /* //模拟错误 if(true){ throw new RuntimeException("error"); }*/ //提交事务 channel.txCommit(); } catch (Exception e) { logger.error(e.getMessage(), e); //回滚事务 channel.txRollback(); } } catch (IOException e) { logger.error(e.getMessage(), e); } catch (TimeoutException e) { logger.error(e.getMessage(), e); } finally { if (conn != null) { try { conn.close(); } catch (IOException e) { e.printStackTrace(); } } } }
-
抓包分析,可以看到整个事务执行流程如下
-
将上面注释的模拟异常代码打开,看看事务回滚的流程
-
如果在
channel.txCommit()
之后出现异常然后调用channel.txRollback()
消息会不会成功呢?将模拟错误代码放到channel.txCommit()
后面.结果是即使我们执行了channel.txRollback()
但是消息依然存储在队列里了,即channel.txCommit()
一旦成功,回滚无效try { //开启事务 channel.txSelect(); channel.basicPublish(exchange, routingKey, mandatory, imhttps://gitee.com/jannal/images/raw/master/RabbitMQ/te, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8")); //提交事务 channel.txCommit(); //模拟错误 if(true){ throw new RuntimeException("error"); } } catch (Exception e) { logger.error(e.getMessage(), e); //回滚事务 channel.txRollback(); }
publish Confirm 模式
- 在标准的
AMQP0-9-1
中,保证发送消息不会丢失的唯一方法是使用事务(这里的事务与数据库的事物不同),在通道上开启事务,发布消息,提交事务。但是事务是非常重量级的,严重影响RabbitMQ
的吞吐量,为了解决这个问题RabbitMQ
进行了扩展,引入了publisher confirms
机制,模拟AMQP
协议中Consumer
的消息确认机制 -
publisher confirms
的几种方式- 同步单个模式
- 同步批量模式
- 异步模式
- 对于可路由的消息,
Broker
在以下情况都满足的情况下才会回复Basic.Ack
消息- 消息被路由到所有的队列中
- 对于路由到持久队列的持久消息,持久化消息到磁盘后
- 如果队列是镜像队列,则需要将消息同步到所有的队列中
- RabbitMQ可能不以发布的顺序向发布者发送确认消息。生产者端尽量不要依赖消息确认的顺序做服务
同步方式发送
单条同步方式发送
-
代码
@Test public void testSingleSyncConfirm() { String userName = "jannal"; String password = "jannal"; String virtualHost = "jannal-vhost"; String queueName = "jannal.queue.confirm"; String exchange = "jannal.exchange.confirm"; String hostName = "jannal.mac.com"; int portNumber = 5672; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(false); Connection conn = null; try { conn = factory.newConnection(); Channel channel = conn.createChannel(); boolean durable = true; boolean exclusive = false; boolean autoDelete = false; channel.queueDeclare(queueName, durable, exclusive, autoDelete, null); channel.exchangeDeclare(exchange, "topic", true); channel.queueBind(queueName, exchange, "*.#"); //开启confirm模式 channel.confirmSelect(); String msg = "Hello, world "; for (int i = 0; i < 10; i++) { try { channel.basicPublish(exchange, "*.#", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8")); if (channel.waitForConfirms()) { logger.info("第{}条消息已经确认", i); //实际应用中应该更新本地消息(防止发送消息因为异常丢失)的状态,确认发送状态,此处可以异步更新 continue; } //如果没有成功或者发生异常,不对本地消息进行处理,等待下一次发送 } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error(e.getMessage(), e); } catch (Exception e) { logger.error(e.getMessage(), e); } } } catch (IOException e) { logger.error(e.getMessage(), e); } catch (TimeoutException e) { logger.error(e.getMessage(), e); } finally { if (conn != null) { try { conn.close(); } catch (IOException e) { e.printStackTrace(); } } } }
-
抓包如下图,在执行发送消息前执行
Confirm.Select
,Broker
在消息已经正确处理完毕之后返回Basic.ack
给客户端。Basic.ack
表示消息已经被正确处理,不会因为Broker
的异常而丢失消息,比如消息如果是持久化的,那么只有持久化后才会返沪Basic.ack
,如果有镜像队列,则队列完全同步之后才会返回Basic.ack
。 -
查看任意一个
Basic.Ack
,第一张图表示Broker
已经正确处理delivery-Tag=1
的数据,第二张图表示Broker
已经正确处理delivery-Tag=9
的数据
批量同步发送
-
代码
@Test public void testBatchSyncConfirm() { String userName = "jannal"; String password = "jannal"; String virtualHost = "jannal-vhost"; String queueName = "jannal.queue.batch.confirm"; String exchange = "jannal.exchange.batch.confirm"; String hostName = "jannal.mac.com"; int portNumber = 5672; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(false); Connection conn = null; try { conn = factory.newConnection(); Channel channel = conn.createChannel(); boolean durable = true; boolean exclusive = false; boolean autoDelete = false; channel.queueDeclare(queueName, durable, exclusive, autoDelete, null); channel.exchangeDeclare(exchange, "topic", true); channel.queueBind(queueName, exchange, "*.#"); //开启confirm模式 channel.confirmSelect(); String msg = null; for (int i = 0; i < 10; i++) { msg = "Hello, world " + i; channel.basicPublish(exchange, "*.#", null, msg.getBytes("UTF-8")); } // 在这种的模式中,如果发送N条消息,如果有一条失败,则所有的消息都需要重新推送 try { if (channel.waitForConfirms()) { //实际应用用批量更新本地消息的状态为已发送状态 logger.info("批量更新本地消息的状态为已发送状态"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error(e.getMessage(), e); } } catch (IOException e) { logger.error(e.getMessage(), e); } catch (TimeoutException e) { logger.error(e.getMessage(), e); } finally { if (conn != null) { try { conn.close(); } catch (IOException e) { e.printStackTrace(); } } } }
-
抓包如下图,
Basic.Ack
的Delivery-Tag
的值是Broker
生成的,这里的Delivery-Tag=2
值并不代表客户端发送的第二条记录的确认。所以批量确认一旦有一条数据丢失或者发送失败,此时无法知道是哪一条发送失败,只能重新再次发送这批数据。broker
通过设置Basic.Ack
中的的multiple=true
来表明到指定***为止的所有消息都已被broker
正确的处理了。
异步发送
-
代码
@Test public void testBatchASyncConfirm() { String userName = "jannal"; String password = "jannal"; String virtualHost = "jannal-vhost"; String queueName = "jannal.queue.async.confirm"; String exchange = "jannal.direct.exchange.async.confirm"; String hostName = "jannal.mac.com"; String routingKey = "SMS"; String bindingKey = "SMS"; int portNumber = 5672; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(false); Connection conn = null; Channel channel = null; try { ConcurrentSkipListSet confirmSet = new ConcurrentSkipListSet(); conn = factory.newConnection(); channel = conn.createChannel(); boolean durable = true; boolean exclusive = false; boolean autoDelete = false; channel.queueDeclare(queueName, durable, exclusive, autoDelete, null); channel.exchangeDeclare(exchange, "direct", true); channel.queueBind(queueName, exchange, bindingKey); channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { //消息多久被ack是无法确定的 @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { // 如果multiple是true,就意味着,小于等于deliveryTag的消息都处理成功了 if (multiple) { logger.info("multiple,批量移除:{}", deliveryTag + 1); confirmSet.headSet(deliveryTag + 1).clear(); } else { logger.info("单个移除:{}", deliveryTag); confirmSet.remove(deliveryTag); } } /** * 当RabbitMQ无法成功的处理消息时,它会返回生产者端basic.nack * basic.nack只有Erlange进程在处理队列时发生内部错误时才会被回送 */ @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { //如果是true,就意味着,小于等于deliveryTag的消息都处理失败了 if (multiple) { logger.info("multiple nack 批量移除:{}", deliveryTag + 1); confirmSet.headSet(deliveryTag + 1).clear(); } else { logger.info("nack单个移除:{}", deliveryTag); confirmSet.remove(deliveryTag); } } }); boolean mandatory = false; boolean immediate = false; String msg = null; for (int i = 0; i < 10; i++) { long nextPublishSeqNo = channel.getNextPublishSeqNo(); msg = "Hello, world " + i; channel.basicPublish(exchange, routingKey, mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8")); confirmSet.add(nextPublishSeqNo); } //等待异步确认完毕 try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } catch (IOException e) { logger.error(e.getMessage(), e); } catch (TimeoutException e) { logger.error(e.getMessage(), e); } finally { if (channel != null) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (conn != null) { try { conn.close(); } catch (IOException e) { e.printStackTrace(); } } } }
-
运行结果
22:48:21.911 [AMQP Connection 127.0.0.1:5672] INFO rabbitmq-producer - 单个移除:1 22:48:21.914 [AMQP Connection 127.0.0.1:5672] INFO rabbitmq-producer - multiple,批量移除:9
-
抓包如下图,第一次
Broker
是单个Ack,Delivery-Tag=1
,第二次Broker
是批量Ack
总结
- 单个
confirm
的性能理论上比事务模式好(毕竟confirm模式下,只需要一条交互Basic.Ack
,而事务模式下需要Tx.Commit/RollBack
和Tx.Commit-Ok/Tx.RollBack-OK
两条交互),批量confirm
大部分时候比单个confirm
好,但是一旦出现confirm返回Basic.Nack
或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量confirm性能会急剧下降。 - 事务机制与
publisher confirm
机制两者不能共存。既不能开启事务模式后有开启publisher confirm
模式。