RocketMQ的顺序消费和事务消费

一、三种消费 :1.普通消费 2. 顺序消费 3.事务消费

1.1  顺序消费:在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一、创建订单 ,第二:订单付款,第三:订单完成。也就是这个三个环节要有顺序,这个订单才有意义。RocketMQ可以保证顺序消费,他的实现是生产者(一个生产者可以对多个主题去发送消息)将这个三个消息放在topic(一个topic默认有4个队列)的一个队列里面,单机支持上万个持久化队列,消费端去消费的时候也是只能有一个Consumer去取得这个队列里面的数据,然后顺序消费。

单个节点(Producer端1个、Consumer端1个)

Producer端

 

 
  1. package order;

  2.  
  3. import java.util.List;

  4.  
  5. import com.alibaba.rocketmq.client.exception.MQBrokerException;

  6. import com.alibaba.rocketmq.client.exception.MQClientException;

  7. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

  8. import com.alibaba.rocketmq.client.producer.MessageQueueSelector;

  9. import com.alibaba.rocketmq.client.producer.SendResult;

  10. import com.alibaba.rocketmq.common.message.Message;

  11. import com.alibaba.rocketmq.common.message.MessageQueue;

  12. import com.alibaba.rocketmq.remoting.exception.RemotingException;

  13.  
  14. /**

  15. * Producer,发送顺序消息

  16. */

  17. public class Producer {

  18. public static void main(String[] args) {

  19. try {

  20. DefaultMQProducer producer = new DefaultMQProducer("order_Producer");

  21. producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

  22.  
  23. producer.start();

  24.  
  25. // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",

  26. // "TagE" };

  27.  
  28. for (int i = 1; i <= 5; i++) {

  29.  
  30. Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());

  31.  
  32. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {

  33. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

  34. Integer id = (Integer) arg;

  35. int index = id % mqs.size();

  36. return mqs.get(index);

  37. }

  38. }, 0);

  39.  
  40. System.out.println(sendResult);

  41. }

  42.  
  43. producer.shutdown();

  44. } catch (MQClientException e) {

  45. e.printStackTrace();

  46. } catch (RemotingException e) {

  47. e.printStackTrace();

  48. } catch (MQBrokerException e) {

  49. e.printStackTrace();

  50. } catch (InterruptedException e) {

  51. e.printStackTrace();

  52. }

  53. }

  54. }

 

Consumer端代码

 

 
  1. package order;

  2.  
  3. import java.util.List;

  4. import java.util.concurrent.TimeUnit;

  5. import java.util.concurrent.atomic.AtomicLong;

  6.  
  7. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

  8. import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;

  9. import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;

  10. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;

  11. import com.alibaba.rocketmq.client.exception.MQClientException;

  12. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

  13. import com.alibaba.rocketmq.common.message.MessageExt;

  14.  
  15. /**

  16. * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)

  17. */

  18. public class Consumer1 {

  19.  
  20. public static void main(String[] args) throws MQClientException {

  21. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");

  22. consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

  23.  
  24. /**

  25. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>

  26. * 如果非第一次启动,那么按照上次消费的位置继续消费

  27. */

  28. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

  29.  
  30. consumer.subscribe("TopicOrderTest", "*");

  31.  
  32. consumer.registerMessageListener(new MessageListenerOrderly() {

  33. AtomicLong consumeTimes = new AtomicLong(0);

  34.  
  35. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {

  36. // 设置自动提交

  37. context.setAutoCommit(true);

  38. for (MessageExt msg : msgs) {

  39. System.out.println(msg + ",内容:" + new String(msg.getBody()));

  40. }

  41.  
  42. try {

  43. TimeUnit.SECONDS.sleep(5L);

  44. } catch (InterruptedException e) {

  45.  
  46. e.printStackTrace();

  47. }

  48. ;

  49.  
  50. return ConsumeOrderlyStatus.SUCCESS;

  51. }

  52. });

  53.  
  54. consumer.start();

  55.  
  56. System.out.println("Consumer1 Started.");

  57. }

  58.  
  59. }

 

 

结果如下图所示:

RocketMQ的顺序消费和事务消费

这个五条数据被顺序消费了

 

多个节点(Producer端1个、Consumer端2个)

Producer端代码:

 

 
  1. package order;

  2.  
  3. import java.util.List;

  4.  
  5. import com.alibaba.rocketmq.client.exception.MQBrokerException;

  6. import com.alibaba.rocketmq.client.exception.MQClientException;

  7. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

  8. import com.alibaba.rocketmq.client.producer.MessageQueueSelector;

  9. import com.alibaba.rocketmq.client.producer.SendResult;

  10. import com.alibaba.rocketmq.common.message.Message;

  11. import com.alibaba.rocketmq.common.message.MessageQueue;

  12. import com.alibaba.rocketmq.remoting.exception.RemotingException;

  13.  
  14. /**

  15. * Producer,发送顺序消息

  16. */

  17. public class Producer {

  18. public static void main(String[] args) {

  19. try {

  20. DefaultMQProducer producer = new DefaultMQProducer("order_Producer");

  21. producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

  22.  
  23. producer.start();

  24.  
  25. // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",

  26. // "TagE" };

  27.  
  28. for (int i = 1; i <= 5; i++) {

  29.  
  30. Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());

  31.  
  32. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {

  33. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

  34. Integer id = (Integer) arg;

  35. int index = id % mqs.size();

  36. return mqs.get(index);

  37. }

  38. }, 0);

  39.  
  40. System.out.println(sendResult);

  41. }

  42. for (int i = 1; i <= 5; i++) {

  43.  
  44. Message msg = new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " + i).getBytes());

  45.  
  46. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {

  47. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

  48. Integer id = (Integer) arg;

  49. int index = id % mqs.size();

  50. return mqs.get(index);

  51. }

  52. }, 1);

  53.  
  54. System.out.println(sendResult);

  55. }

  56. for (int i = 1; i <= 5; i++) {

  57.  
  58. Message msg = new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " + i).getBytes());

  59.  
  60. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {

  61. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

  62. Integer id = (Integer) arg;

  63. int index = id % mqs.size();

  64. return mqs.get(index);

  65. }

  66. }, 2);

  67.  
  68. System.out.println(sendResult);

  69. }

  70.  
  71. producer.shutdown();

  72. } catch (MQClientException e) {

  73. e.printStackTrace();

  74. } catch (RemotingException e) {

  75. e.printStackTrace();

  76. } catch (MQBrokerException e) {

  77. e.printStackTrace();

  78. } catch (InterruptedException e) {

  79. e.printStackTrace();

  80. }

  81. }

  82. }

 

Consumer1

 

 
  1. /**

  2. * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)

  3. */

  4. public class Consumer1 {

  5.  
  6. public static void main(String[] args) throws MQClientException {

  7. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");

  8. consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

  9.  
  10. /**

  11. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>

  12. * 如果非第一次启动,那么按照上次消费的位置继续消费

  13. */

  14. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

  15.  
  16. consumer.subscribe("TopicOrderTest", "*");

  17.  
  18. /**

  19. * 实现了MessageListenerOrderly表示一个队列只会被一个线程取到

  20. *,第二个线程无法访问这个队列

  21. */

  22. consumer.registerMessageListener(new MessageListenerOrderly() {

  23. AtomicLong consumeTimes = new AtomicLong(0);

  24.  
  25. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {

  26. // 设置自动提交

  27. context.setAutoCommit(true);

  28. for (MessageExt msg : msgs) {

  29. System.out.println(msg + ",内容:" + new String(msg.getBody()));

  30. }

  31.  
  32. try {

  33. TimeUnit.SECONDS.sleep(5L);

  34. } catch (InterruptedException e) {

  35.  
  36. e.printStackTrace();

  37. }

  38. ;

  39.  
  40. return ConsumeOrderlyStatus.SUCCESS;

  41. }

  42. });

  43.  
  44. consumer.start();

  45.  
  46. System.out.println("Consumer1 Started.");

  47. }

  48.  
  49. }


 

 

Consumer2

 

 
  1. /**

  2. * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)

  3. */

  4. public class Consumer2 {

  5.  
  6. public static void main(String[] args) throws MQClientException {

  7. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");

  8. consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

  9.  
  10. /**

  11. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>

  12. * 如果非第一次启动,那么按照上次消费的位置继续消费

  13. */

  14. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

  15.  
  16. consumer.subscribe("TopicOrderTest", "*");

  17.  
  18. /**

  19. * 实现了MessageListenerOrderly表示一个队列只会被一个线程取到

  20. *,第二个线程无法访问这个队列

  21. */

  22. consumer.registerMessageListener(new MessageListenerOrderly() {

  23. AtomicLong consumeTimes = new AtomicLong(0);

  24.  
  25. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {

  26. // 设置自动提交

  27. context.setAutoCommit(true);

  28. for (MessageExt msg : msgs) {

  29. System.out.println(msg + ",内容:" + new String(msg.getBody()));

  30. }

  31.  
  32. try {

  33. TimeUnit.SECONDS.sleep(5L);

  34. } catch (InterruptedException e) {

  35.  
  36. e.printStackTrace();

  37. }

  38. ;

  39.  
  40. return ConsumeOrderlyStatus.SUCCESS;

  41. }

  42. });

  43.  
  44. consumer.start();

  45.  
  46. System.out.println("Consumer2 Started.");

  47. }

  48.  
  49. }


 

 

先启动Consumer1和Consumer2,然后启动Producer,Producer会发送15条消息
Consumer1消费情况如图,都按照顺序执行了

RocketMQ的顺序消费和事务消费


Consumer2消费情况如图,都按照顺序执行了

RocketMQ的顺序消费和事务消费

 



二、事务消费

这里说的主要是分布式事物。下面的例子的数据库分别安装在不同的节点上。

事物消费需要先说说什么是事物。比如说:我们跨行转账,从工商银行转到建设银行,也就是我从工商银行扣除1000元之后,我的建设银行也必须加1000元。这样才能保证数据的一致性。假如工商银行转1000元之后,建设银行的服务器突然宕机,那么我扣除了1000,但是并没有在建设银行给我加1000,就出现了数据的不一致。因此加1000和减1000才行,减1000和减1000必须一起成功,一起失败。

再比如,我们进行网购的时候,我们下单之后,订单提交成功,仓库商品的数量必须减一。但是订单可能是一个数据库,仓库数量可能又是在另个数据库里面。有可能订单提交成功之后,仓库数量服务器突然宕机。这样也出现了数据不一致的问题。

使用消息队列来解决分布式事物:

现在我们去外面饭店吃饭,很多时候都不会直接给了钱之后直接在付款的窗口递饭菜,而是付款之后他会给你一张小票,你拿着这个小票去出饭的窗口取饭。这里和我们的系统类似,提高了吞吐量。即使你到第二个窗口,师傅告诉你已经没饭了,你可以拿着这个凭证去退款,即使中途由于出了意外你无法到达窗口进行取饭,但是只要凭证还在,可以将钱退给你。这样就保证了数据的一致性。

如何保证凭证(消息)有2种方法:

1、在工商银行扣款的时候,余额表扣除1000,同时记录日志,而且这2个表是在同一个数据库实例中,可以使用本地事物解决。然后我们通知建设银行需要加1000给该用户,建设银行收到之后给我返回已经加了1000给用户的确认信息之后,我再标记日志表里面的日志为已经完成。

2、通过消息中间件

原文地址:http://www.jianshu.com/p/453c6e7ff81c

RocketMQ的顺序消费和事务消费

 

RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改消息的状态。

细心的你可能又发现问题了,如果确认消息发送失败了怎么办?RocketMQ会定期扫描消息集群中的事物消息,如果发现了Prepared消息,它会向消息发送端(生产者)确认,Bob的钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢?RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

例子:

Consumer 端

 

 
  1. package transaction;

  2.  
  3. import java.util.List;

  4.  
  5. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

  6. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

  7. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

  8. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

  9. import com.alibaba.rocketmq.client.exception.MQClientException;

  10. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

  11. import com.alibaba.rocketmq.common.message.MessageExt;

  12.  
  13. /**

  14. * Consumer,订阅消息

  15. */

  16. public class Consumer {

  17.  
  18. public static void main(String[] args) throws InterruptedException, MQClientException {

  19. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_Consumer");

  20. consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

  21. consumer.setConsumeMessageBatchMaxSize(10);

  22. /**

  23. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>

  24. * 如果非第一次启动,那么按照上次消费的位置继续消费

  25. */

  26. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

  27.  
  28. consumer.subscribe("TopicTransactionTest", "*");

  29.  
  30. consumer.registerMessageListener(new MessageListenerConcurrently() {

  31.  
  32. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

  33.  
  34. try {

  35.  
  36. for (MessageExt msg : msgs) {

  37. System.out.println(msg + ",内容:" + new String(msg.getBody()));

  38. }

  39.  
  40. } catch (Exception e) {

  41. e.printStackTrace();

  42.  
  43. return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试

  44.  
  45. }

  46.  
  47. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功

  48. }

  49. });

  50.  
  51. consumer.start();

  52.  
  53. System.out.println("transaction_Consumer Started.");

  54. }

  55. }

 

 

Producer端

 

 
  1. package transaction;

  2.  
  3. import com.alibaba.rocketmq.client.exception.MQClientException;

  4. import com.alibaba.rocketmq.client.producer.SendResult;

  5. import com.alibaba.rocketmq.client.producer.TransactionCheckListener;

  6. import com.alibaba.rocketmq.client.producer.TransactionMQProducer;

  7. import com.alibaba.rocketmq.common.message.Message;

  8.  
  9. /**

  10. * 发送事务消息例子

  11. *

  12. */

  13. public class Producer {

  14. public static void main(String[] args) throws MQClientException, InterruptedException {

  15.  
  16. TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();

  17. TransactionMQProducer producer = new TransactionMQProducer("transaction_Producer");

  18. producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

  19. // 事务回查最小并发数

  20. producer.setCheckThreadPoolMinSize(2);

  21. // 事务回查最大并发数

  22. producer.setCheckThreadPoolMaxSize(2);

  23. // 队列数

  24. producer.setCheckRequestHoldMax(2000);

  25. producer.setTransactionCheckListener(transactionCheckListener);

  26. producer.start();

  27.  
  28. // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE"

  29. // };

  30. TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();

  31. for (int i = 1; i <= 2; i++) {

  32. try {

  33. Message msg = new Message("TopicTransactionTest", "transaction" + i, "KEY" + i,

  34. ("Hello RocketMQ " + i).getBytes());

  35. SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);

  36. System.out.println(sendResult);

  37.  
  38. Thread.sleep(10);

  39. } catch (MQClientException e) {

  40. e.printStackTrace();

  41. }

  42. }

  43.  
  44. for (int i = 0; i < 100000; i++) {

  45. Thread.sleep(1000);

  46. }

  47.  
  48. producer.shutdown();

  49.  
  50. }

  51. }


TransactionExecuterImpl  --执行本地事务

 
  1. package transaction;

  2.  
  3. import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;

  4. import com.alibaba.rocketmq.client.producer.LocalTransactionState;

  5. import com.alibaba.rocketmq.common.message.Message;

  6.  
  7. /**

  8. * 执行本地事务

  9. */

  10. public class TransactionExecuterImpl implements LocalTransactionExecuter {

  11. // private AtomicInteger transactionIndex = new AtomicInteger(1);

  12.  
  13. public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {

  14.  
  15. System.out.println("执行本地事务msg = " + new String(msg.getBody()));

  16.  
  17. System.out.println("执行本地事务arg = " + arg);

  18.  
  19. String tags = msg.getTags();

  20.  
  21. if (tags.equals("transaction2")) {

  22. System.out.println("======我的操作============,失败了 -进行ROLLBACK");

  23. return LocalTransactionState.ROLLBACK_MESSAGE;

  24. }

  25. return LocalTransactionState.COMMIT_MESSAGE;

  26. // return LocalTransactionState.UNKNOW;

  27. }

  28. }


TransactionCheckListenerImpl--未决事务,服务器回查客户端(目前已经被阉割啦)

 

 
  1. package transaction;

  2.  
  3. import com.alibaba.rocketmq.client.producer.LocalTransactionState;

  4. import com.alibaba.rocketmq.client.producer.TransactionCheckListener;

  5. import com.alibaba.rocketmq.common.message.MessageExt;

  6.  
  7. /**

  8. * 未决事务,服务器回查客户端

  9. */

  10. public class TransactionCheckListenerImpl implements TransactionCheckListener {

  11. // private AtomicInteger transactionIndex = new AtomicInteger(0);

  12.  
  13. //在这里,我们可以根据由MQ回传的key去数据库查询,这条数据到底是成功了还是失败了。

  14. public LocalTransactionState checkLocalTransactionState(MessageExt msg) {

  15. System.out.println("未决事务,服务器回查客户端msg =" + new String(msg.getBody().toString()));

  16. // return LocalTransactionState.ROLLBACK_MESSAGE;

  17.  
  18. return LocalTransactionState.COMMIT_MESSAGE;

  19.  
  20. // return LocalTransactionState.UNKNOW;

  21. }

  22. }


 

 

 

 

producer端:发送数据到MQ,并且处理本地事物。这里模拟了一个成功一个失败。Consumer只会接收到本地事物成功的数据。第二个数据失败了,不会被消费。


RocketMQ的顺序消费和事务消费
 

 

Consumer只会接收到一个,第二个数据不会被接收到

 

RocketMQ的顺序消费和事务消费

 

--------------------- 本文来自 Mr_蜗牛 的**** 博客 ,全文地址请点击:https://blog.****.net/u010634288/article/details/57158374?utm_source=copy