springboot中使用jta+atomikos处理多数据源分布式事务问题

springboot中使用jta+atomikos处理多数据源分布式事务问题

多数据源拆分思路:

例子:公司分为两个数据库,一个数据库专门存放共同配置文件,一个数据库是垂直业务数据库

垂直: 业务划分具体数据库

在一个项目中可以有无限个数据源,具体多少根据内存大小

在一个项目多数据源如何划分:

(1)分包名: (常用)

com.lqr.test01 — datasource1

com.lqr.test02 — datasource2

类似多个不同jar包,对应不同业务需求。

多个不同的业务需求,存放在同一个项目中。

(2)注解方式:

使用AOP拦截,自定义一个注解

spring事务管理:

(1)spring事务分类:

声明事务

编程事务

(2)spring事务原理:

AOP技术环绕通知进行拦截

使用spring事务注意事项:不要try,原因:要把异常抛出给外层

(3)@Transactional注解:

使用@Transactional注解后可以解决多条SQL语句中上一条SQL添加语句成功而第二条SQL添加语句失败导致的不满足一致性的问题,可以在任一一条SQL语句异常时整个方法回滚。

在处理多数据源的时候,需要对每个数据源(DataSource)配置对应的事务管理器(transactionManager),并在需要进行事务管理的方法上加上@Transactional注解并设置事务管理器,如:
springboot中使用jta+atomikos处理多数据源分布式事务问题
就可以使该数据源的SQL语句回滚。

但是存在一个问题,如果项目中的某个方法同时对两个数据源的表进行了SQL添加/删除/更新语句操作,那么只指定一个事务管理器的话,发生异常将只有该事务管理器对应的数据源的SQL操作回滚,而另一个数据源的SQL操作则不会回滚,违反了一致性的原则。

这里介绍如何使用jta+atomikos来解决多数据源分布式事务问题:

步骤如下:
(1)在pom.xml文件中添加依赖:
<dependencies>
        <!-- jta+atomikos -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>

        <!--freemarker -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-freemarker</artifactId>
        </dependency>

        <!-- AOP -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>

        <!-- log4j -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j</artifactId>
            <version>1.3.8.RELEASE</version>
        </dependency>

        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <!-- mybatis -->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.1.1</version>
        </dependency>

        <!-- mysql -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
(2)在application.properties中添加多个数据源配置:
##mysql1
mysql.datasource.test1.url = jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8
mysql.datasource.test1.username = root
mysql.datasource.test1.password = 123456
mysql.datasource.test1.minPoolSize = 3
mysql.datasource.test1.maxPoolSize = 25
mysql.datasource.test1.maxLifeTime = 20000
mysql.datasource.test1.borrowConnectionTimeout = 30
mysql.datasource.test1.loginTimeout = 30
mysql.datasource.test1.maxintenanceInterval = 60
mysql.datasource.test1.maxIdleTime = 60

##mysql2
mysql.datasource.test2.url=jdbc:mysql://localhost:3306/test02?useUnicode=true&characterEncoding=utf-8
mysql.datasource.test2.username=root
mysql.datasource.test2.password=123456
mysql.datasource.test2.minPoolSize = 3
mysql.datasource.test2.maxPoolSize = 25
mysql.datasource.test2.maxLifeTime = 20000
mysql.datasource.test2.borrowConnectionTimeout = 30
mysql.datasource.test2.loginTimeout = 30
mysql.datasource.test2.maxintenanceInterval = 60
mysql.datasource.test2.maxIdleTime = 60
(3)添加DBConfig1类读取配置文件中的第一个数据源信息,DBConfig2同理:
package com.lqr.springboot.datamuilt02.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

@Data
@ConfigurationProperties(prefix = "mysql.datasource.test1")
public class DBConfig1 {

    private String url;
    private String username;
    private String password;
    private int minPoolSize;
    private int maxPoolSize;
    private int maxLifetime;
    private int borrowConnectionTimeout;
    private int loginTimeout;
    private int maintenanceInterval;
    private int maxIdleTime;
    private String testQuery;
}
package com.lqr.springboot.datamuilt02.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

@Data
@ConfigurationProperties(prefix = "mysql.datasource.test2")
public class DBConfig2 {

    private String url;
    private String username;
    private String password;
    private int minPoolSize;
    private int maxPoolSize;
    private int maxLifetime;
    private int borrowConnectionTimeout;
    private int loginTimeout;
    private int maintenanceInterval;
    private int maxIdleTime;
    private String testQuery;
}

在类上方加上读取配置文件的注解和@Data注解,@Data注解由lombok提供,可以省略该类的get、set方法,由lombok自动生成。

(4)添加MyBatisConfig1类配置DataSource等,MyBatisConfig2类同:
package com.lqr.springboot.datamuilt02.datasource;

import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.lqr.springboot.datamuilt02.config.DBConfig1;
import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;
import java.sql.SQLException;

@Configuration
@MapperScan(basePackages = "com.lqr.springboot.datamuilt02.test01",sqlSessionTemplateRef = "testSqlSessionTemplate")
public class MyBatisConfig1 {

    //配置数据源

    @Bean(name = "testDataSource")
    public DataSource testDataSource(DBConfig1 testConfig) throws SQLException{
        MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
        mysqlXADataSource.setURL(testConfig.getUrl());
        mysqlXADataSource.setPinGlobalTxToPhysicalConnection(true);
        mysqlXADataSource.setPassword(testConfig.getPassword());
        mysqlXADataSource.setUser(testConfig.getUsername());
        mysqlXADataSource.setPinGlobalTxToPhysicalConnection(true);

        //创建atomikos全局事务
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(mysqlXADataSource);
        xaDataSource.setUniqueResourceName("testDataSource");

        xaDataSource.setMinPoolSize(testConfig.getMinPoolSize());
        xaDataSource.setMaxPoolSize(testConfig.getMaxPoolSize());
        xaDataSource.setMaxLifetime(testConfig.getMaxLifetime());
        xaDataSource.setBorrowConnectionTimeout(testConfig.getBorrowConnectionTimeout());
        xaDataSource.setLoginTimeout(testConfig.getLoginTimeout());
        xaDataSource.setMaintenanceInterval(testConfig.getMaintenanceInterval());
        xaDataSource.setMaxIdleTime(testConfig.getMaxIdleTime());
        xaDataSource.setTestQuery(testConfig.getTestQuery());
        return xaDataSource;
    }

    @Bean(name = "testSqlSessionFactory")
    public SqlSessionFactory testSqlSessionFactory(@Qualifier("testDataSource") DataSource dataSource)
            throws Exception{
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        return bean.getObject();
    }

    @Bean(name = "testSqlSessionTemplate")
    public SqlSessionTemplate testSqlSessionTemplate(
            @Qualifier("testSqlSessionFactory") SqlSessionFactory sqlSessionFactory)throws Exception{
        return new SqlSessionTemplate(sqlSessionFactory);
    }

}
package com.lqr.springboot.datamuilt02.datasource;

import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.lqr.springboot.datamuilt02.config.DBConfig1;
import com.lqr.springboot.datamuilt02.config.DBConfig2;
import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;
import java.sql.SQLException;

@Configuration
@MapperScan(basePackages = "com.lqr.springboot.datamuilt02.test02",sqlSessionTemplateRef = "test2SqlSessionTemplate")
public class MyBatisConfig2 {

    //配置数据源

    @Bean(name = "test2DataSource")
    public DataSource testDataSource(DBConfig2 testConfig) throws SQLException{
        MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
        mysqlXADataSource.setURL(testConfig.getUrl());
        mysqlXADataSource.setPinGlobalTxToPhysicalConnection(true);
        mysqlXADataSource.setPassword(testConfig.getPassword());
        mysqlXADataSource.setUser(testConfig.getUsername());
        mysqlXADataSource.setPinGlobalTxToPhysicalConnection(true);

        //创建atomikos全局事务
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(mysqlXADataSource);
        xaDataSource.setUniqueResourceName("test2DataSource");

        xaDataSource.setMinPoolSize(testConfig.getMinPoolSize());
        xaDataSource.setMaxPoolSize(testConfig.getMaxPoolSize());
        xaDataSource.setMaxLifetime(testConfig.getMaxLifetime());
        xaDataSource.setBorrowConnectionTimeout(testConfig.getBorrowConnectionTimeout());
        xaDataSource.setLoginTimeout(testConfig.getLoginTimeout());
        xaDataSource.setMaintenanceInterval(testConfig.getMaintenanceInterval());
        xaDataSource.setMaxIdleTime(testConfig.getMaxIdleTime());
        xaDataSource.setTestQuery(testConfig.getTestQuery());
        return xaDataSource;
    }

    @Bean(name = "test2SqlSessionFactory")
    public SqlSessionFactory testSqlSessionFactory(@Qualifier("test2DataSource") DataSource dataSource)
            throws Exception{
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        return bean.getObject();
    }

    @Bean(name = "test2SqlSessionTemplate")
    public SqlSessionTemplate testSqlSessionTemplate(
            @Qualifier("test2SqlSessionFactory") SqlSessionFactory sqlSessionFactory)throws Exception{
        return new SqlSessionTemplate(sqlSessionFactory);
    }

}

在类上加上@MapperScan(basePackages = “xxx”,sqlSessionTemplateRef = “xxx”)注解为分包的service层mapper层提供不同数据源

可以看出,jta+atomikos的原理是将不同数据源的本地事务都交给全局事务来处理。
springboot中使用jta+atomikos处理多数据源分布式事务问题

(5)分包创建不同数据源的service层及mapper层

springboot中使用jta+atomikos处理多数据源分布式事务问题

(6)只需要在同时对两个数据源进行SQL操作的service的方法上加上 @Transactional注解,不需要指定事务管理器,就可以实现分布式事务管理
package com.lqr.springboot.datamuilt02.test02.service;

import com.lqr.springboot.datamuilt02.test01.mapper.UserMapperTest01;
import com.lqr.springboot.datamuilt02.test02.mapper.UserMapperTest02;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@Slf4j
public class UserServiceTest02 {
    @Autowired
    private UserMapperTest02 userMapperTest02;

    @Autowired
    private UserMapperTest01 userMapperTest01;

    @Transactional
    public Integer insertUserAndTestmul(String name,Integer age){
        //数据源1
        Integer result1 = userMapperTest01.insertUser(name,age);
        //数据源2
        Integer result2 = userMapperTest02.insertUser(name,age);
        Integer finalresult=result1+result2;
        int i = 1/age;
        log.info("{}",finalresult);
        return finalresult;
    }
}
(7)在启动类上添加@EnableConfigurationProperties注解,注入配置信息
package com.lqr.springboot.datamuilt02;

import com.lqr.springboot.datamuilt02.config.DBConfig1;
import com.lqr.springboot.datamuilt02.config.DBConfig2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;

@SpringBootApplication
@EnableConfigurationProperties(value = {DBConfig1.class,DBConfig2.class})
public class Datamuilt02Application {

    public static void main(String[] args) {
        SpringApplication.run(Datamuilt02Application.class, args);
    }
}

启动项目,访问http://127.0.0.1:8080/insertUserAndTestmul?name=error&age=0即可看到效果:

springboot中使用jta+atomikos处理多数据源分布式事务问题
可以看到发生除0错误,那么我们的两个数据库插入操作都应该回滚,打开Navicat
springboot中使用jta+atomikos处理多数据源分布式事务问题
可以看到都没有添加成功,说明两个数据源的操作都已回滚。