分布式事务——纯MQ实现

一、MQ实现分布式事务,最简单的原理框架:
借助MQ的消息可靠传递,实现业务间解耦、事务强一致

1、>> 生产者发送消息做可靠性检查,确保消息真正投递出去;

2、>> 消费者做幂等,确保业务没有重复执行;

3、>> 消费者做异常重试,反复出错时需要捕捉异常并记录,以便手工干预;

二、场景实践:
场景
以支付宝转账到余额宝为例,在支付宝已经扣款成功的情况下,余额宝一定收到转账

>> 支付宝和余额宝是两个微服务;

>> 用户用支付宝转1万元到余额宝;

支付宝账户先扣除金额,MQ通知余额宝账户添加金额;

>> 支付宝账户表: update A set amount = amount - 10000 where userId = 1;

>> 余额宝账户表: update B set amount = amount + 10000 where userId = 1;

具体操作
1、创建队列
>> 创建一个持久化的队列,名称为money;
分布式事务——纯MQ实现
Durable(持久化保存),Transient(即时保存)。

2、 maven引入
(余额宝和支付宝两个服务都需要)

[Java] 纯文本查看 复制代码
1
2
3
4
5
6
7
<dependency>
 
    <groupId>org.springframework.boot</groupId>
 
    <artifactId>spring-boot-starter-amqp</artifactId>
 
</dependency>



AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。



3、  添加配置
(余额宝和支付宝两个服务都需要)

[Java] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
spring:   
 
    rabbitmq:
 
        host: 192.168.222.26 #MQ地址
 
        port: 5672 #MQ端口
 
        username: admin #MQ用户名
 
        password: admin #MQ密码
 
        publisher-confirms: true #开启消息发送成功监听
 
        publisher-returns: true #开启消息发送失败监听
 
        listener:
 
            simple:
 
                acknowledge-mode: manual #手动提交事务



4 支付宝端代码编写
向支付宝账号扣款,同时发消息到队列中,通知余额宝账号金额更新

>>  配置MQ发送过程监听

[Java] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package cn.my.server.clientdemo.zhifu;
  
import javax.annotation.PostConstruct;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
  
/**
 * 配置MQ发送过程监听
 * @author twotiger-wxm.
 * @date 2020-3-3.
 */
@Configuration
public class RabbmitMqConfig {
  
        @Autowired
        private RabbitTemplate rabbitTemplate;
  
        @PostConstruct
        public void init() {
            // 设置发送成功回调
            rabbitTemplate.setConfirmCallback(initConfirmCallback());
            // 设置发送失败回调
            rabbitTemplate.setReturnCallback(initReturnCallback());
        }
  
        @Bean
        public ReturnCallback initReturnCallback(){
            return new FailedListener();
        }
  
        @Bean
        public ConfirmCallback initConfirmCallback(){
            return new SuccessListener();
        }
  
}

 

[AppleScript] 纯文本查看 复制代码
001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package cn.my.server.clientdemo.zhifu;
 
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
 
import org.springframework.amqp.rabbit.support.CorrelationData;
 
/**
 
 * 消息发送成功时触发
 
 * 测试发现队列不存在也会触发,但会在FailedListener之后触发
 
 */
 
public class SuccessListener implements ConfirmCallback{
 
    @Override
 
    public void confirm(CorrelationData data, boolean success, String result) {
 
        System.out.println(data+","+success+","+result);
 
        if(null!=data){
 
            System.out.println("业务主键:"+data.getId());
 
        }
 
        if(success){  
 
            boolean resultT = true;// 获取流水表状态,为fail则失败(因为队列不存在也会触发,且在失败监听之后触发,所以此处优先排除一下失败监听的操作。)
 
            if(resultT){
 
                //更新支付宝账户金额
 
                System.out.println("消息发送到MQ成功");
 
                System.out.println("【支付宝账户扣款】  update A set amount = amount - 10000 where userId = 1");
 
            }else{
 
                System.out.println("消息发送到MQ失败");
 
                System.out.println("【支付宝账户不扣款】根据业务主键更新支付宝流水表状态为失败,update A_MESSAGE set sts = fail where transId=000001");
 
            }     
 
        }else{
 
            System.out.println("消息发送到MQ失败原因:"+result);
 
            System.out.println("【支付宝账户不扣款】根据业务主键更新支付宝流水表状态为失败,update A_MESSAGE set sts = fail where transId=000001");
 
        }     
 
    }
 
      
 
}
 
package cn.my.server.clientdemo.zhifu;
 
import java.util.HashMap;
 
import org.springframework.amqp.core.Message;
 
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
 
import com.fasterxml.jackson.databind.ObjectMapper;
 
/**
 
 * 消息发送失败时,监听被触发
 
 * 一般像队列不存在,或者网络中断时会触发,且第一个触发
 
 */
 
public class FailedListener implements ReturnCallback{
 
    private ObjectMapper mapper = new ObjectMapper();
 
    @Override
 
    public void returnedMessage(Message message, int state, String result, String arg3, String queneName) {
 
        System.out.println(message+","+state+","+result+","+arg3+","+queneName);
 
          
 
        try {
 
            String msg = new String(message.getBody());
 
            HashMap data = mapper.readValue(msg, HashMap.class);// 失败的消息内容
 
            System.out.println("失败的转账记录:"+mapper.writeValueAsString(data));
 
            System.out.println("【支付宝账户不扣款】根据业务主键更新支付宝流水表状态为失败,update A_MESSAGE set sts = fail where transId=000001");
 
        } catch (Exception e) {
 
            e.printStackTrace();
 
        }
 
    }
 
      
 
}



>>  转账接口编写

[Java] 纯文本查看 复制代码
001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
package cn.my.server.clientdemo.zhifu;
 
import java.util.HashMap;
 
import java.util.Map;
 
import org.springframework.amqp.AmqpException;
 
import org.springframework.amqp.rabbit.core.RabbitTemplate;
 
import org.springframework.amqp.rabbit.support.CorrelationData;
 
import org.springframework.beans.factory.annotation.Autowired;
 
import org.springframework.transaction.annotation.Transactional;
 
import org.springframework.web.bind.annotation.GetMapping;
 
import org.springframework.web.bind.annotation.RequestMapping;
 
import org.springframework.web.bind.annotation.RestController;
 
import com.fasterxml.jackson.core.JsonProcessingException;
 
import com.fasterxml.jackson.databind.ObjectMapper;
 
/**
 
 * 支付宝端业务测试
 
 */
 
@RestController
 
@RequestMapping("/api/test")
 
public class ZhiFuBaoResource {
 
    /**
 
     * 测试接口
 
     * @throws AmqpException
 
     * @throws JsonProcessingException
 
     */
 
    @GetMapping("/mq")
 
    public void trans1() throws AmqpException, JsonProcessingException {
 
        trans();
 
    }
 
    @Autowired
 
    private RabbitTemplate rabbitTemplate;
 
    private ObjectMapper mapper = new ObjectMapper();
 
    /**
 
     * 转账方法
 
     * @return
 
     * @throws AmqpException
 
     * @throws JsonProcessingException
 
     */
 
    @Transactional
 
    public boolean trans() throws AmqpException, JsonProcessingException {
 
        Map data = new HashMap<String, Object>();
 
        String transId = "00000001";//生成业务流水号
 
        data.put("transId", transId);
 
        data.put("userId", "1");
 
        data.put("money", 10000);
 
          
 
        //A_MESSAGE交易流水表,主键是【交易编号】,重复插入会报异常,类似于幂等操作
 
        System.out.println("【支付宝记账操作】重复插入会报异常,类似于幂等操作  insert into A_MESSAGE(transId,userId,money) values (000001,1,10000)");
 
              
 
        // 流水号传入,方便消息发送失败时做操作
 
        CorrelationData correlationData = new CorrelationData(transId);
 
        rabbitTemplate.convertAndSend("","money",mapper.writeValueAsString(data),correlationData);
 
        return true;
 
    }
 
}


5 余额宝端代码编写

监听支付宝发到消息队列中的消息,做余额宝账号金额更新

 

[Java] 纯文本查看 复制代码

分布式事务——纯MQ实现

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
<font style="color:rgb(77, 77, 77)"><font face="&quot;"><font style="font-size:16px">package cn.my.server.clientdemo.yuer;
 
import java.io.IOException;
 
import java.util.Collections;
 
import java.util.HashMap;
 
import java.util.Map;
 
import org.slf4j.Logger;
 
import org.slf4j.LoggerFactory;
 
import org.springframework.amqp.core.Message;
 
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 
import org.springframework.amqp.rabbit.annotation.RabbitListener;
 
import org.springframework.retry.RecoveryCallback;
 
import org.springframework.retry.RetryCallback;
 
import org.springframework.retry.RetryContext;
 
import org.springframework.retry.backoff.FixedBackOffPolicy;
 
import org.springframework.retry.policy.SimpleRetryPolicy;
 
import org.springframework.retry.support.RetryTemplate;
 
import org.springframework.stereotype.Component;
 
import org.springframework.transaction.annotation.Transactional;
 
import com.fasterxml.jackson.core.JsonProcessingException;
 
import com.fasterxml.jackson.databind.ObjectMapper;
 
import com.rabbitmq.client.Channel;
 
/**
 
 * 余额宝端消息监听
 
 */
 
@Component
 
public class YuErBaoMessageListeners {
 
    private final Logger log = LoggerFactory.getLogger(this.getClass());
 
    private ObjectMapper mapper = new ObjectMapper();
 
  
 
    /**
 
     * 监听消息队列
 
     * @param message  消息内容
 
     * @param channel  消息渠道
 
     * @throws IOException  异常
 
     */
 
    @RabbitListener(queues = "money")
 
    @RabbitHandler
 
    public void receiveQueue(Message message, Channel channel) throws IOException {
 
        String msg = "";
 
        try {
 
            // 业务处理逻辑
 
            msg = new String(message.getBody());
 
            Map data = mapper.readValue(msg, HashMap.class);
 
            retry(data);
 
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 手动应答消息已经处理
 
        } catch (Exception e) {
 
            log.error("MQ接收消息内容[" + msg + "],后处理异常:" + e);
 
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 手动应答消息已经处理
 
        }
 
    }
 
  
 
    /**
 
     * 更新余额宝账户金额
 
     *
 
     * @param map
 
     */
 
    @Transactional
 
    public void bizOp(Map<String, Object> map) {
 
        // B_MESSAGE交易流水表,主键是【交易编号】,重复插入会报异常,类似于幂等操作
 
        System.out.println(
 
            "【余额宝记账操作】重复插入会报异常,类似于幂等操作 insert into B_MESSAGE(transId,userId,money) values (000001,1,10000)");
 
        // 更新余额宝账户金额
 
        System.out.println("【余额宝账户入款】 update B set amount = amount +10000 where userId = 1");
 
  
 
    }
 
  
 
    /**
 
     * 一直报错,重试次数用完了,保存如下信息供人工干预
 
     *
 
     * @param map
 
     * @throws JsonProcessingException
 
     */
 
    public void failed(Map<String, Object> map) throws JsonProcessingException {
 
        System.out.println("一直报错,重试次数用完了,保存如下信息供人工干预:\n" + mapper.writeValueAsString(map));
 
    }
 
  
 
    /**
 
     * 异常时最多重试 3次,成功为止
 
     *
 
     * @param map
 
     *            输入参数
 
     */
 
    private void retry(Map<String, Object> map) {
 
        // 构建重试模板实例
 
        RetryTemplate retryTemplate = new RetryTemplate();
 
        // 设置重试次数
 
        SimpleRetryPolicy policy = new SimpleRetryPolicy(3,
 
                Collections.<Class<? extends Throwable>, Boolean>singletonMap(Exception.class, true));
 
        // 设置重试间隔时间
 
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
 
        fixedBackOffPolicy.setBackOffPeriod(100);
 
        retryTemplate.setRetryPolicy(policy);
 
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
 
        // 编写业务处理代码逻辑
 
        final RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() {
 
            public Object doWithRetry(RetryContext context) throws Exception {
 
                System.out.println("第" + (1 + context.getRetryCount()) + "次处理");
 
                try {
 
                    bizOp(map);
 
                } catch (Exception e) {
 
                    e.printStackTrace();
 
                    throw new Exception("捕捉到业务处理异常,需要抛出");// 这个点特别注意,重试的根源通过Exception返回
 
                }
 
                return null;
 
            }
 
        };
 
        // 重试次数执行完依然报错,走如下逻辑
 
        final RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() {
 
            public Object recover(RetryContext context) throws Exception {
 
                failed(map);
 
                return null;
 
            }
 
        };
 
        try {
 
            // 由retryTemplate 执行execute方法开始逻辑执行
 
            retryTemplate.execute(retryCallback, recoveryCallback);
 
        } catch (Exception e) {
 
            e.printStackTrace();
 
        }
 
    }
 
}
</font></font></font>

 

6 测试

>> 浏览器直接访问 /api/test/mq

>> 控制台结果如下:

 

[Java] 纯文本查看 复制代码
1
2
3
4
5
6
7
8
<font style="color:rgb(77, 77, 77)"><font face="&quot;"><font style="font-size:16px">【支付宝记账操作】重复插入会报异常,类似于幂等操作  insert into A_MESSAGE(transId,userId,money) values (000001,1,10000)
 
【余额宝记账操作】重复插入会报异常,类似于幂等操作 insert into B_MESSAGE(transId,userId,money) values (000001,1,10000)
 
【余额宝账户入款】 update B set amount = amount + 10000 where userId = 1
 
【支付宝账户扣款】 update A set amount = amount - 10000 where userId = 1
</font></font></font>