ShardingJdbc、Spring和mybatis结合的整个源码执行流程解析

一.背景知识

1.Sharding jdbc

当当提供的轻量级java框架,通过客户端进行分库分表策略执行等操作,直连到数据库
http://shardingsphere.io/document/current/cn/overview/

2.Spring

项目给于spring提供的事务管理和springBoot带来的快速启动

3.Mybatis和Mybatis-Spring

使用mybatis来执行ORM的相关操作,大大减少了单纯使用jdbc带来的大量映射的工作量
http://www.mybatis.org/mybatis-3/zh/index.html
http://www.mybatis.org/spring/zh/

二.通过代码来学习

1.前置代码准备

首先来看一段我们日常经常会写的代码
Controller:

@RequestMapping
	public String insertUser(){
		User user = new User();
		user.setName("hello test spring transactional");
		user.setUserId(0L);
		userService.save(user);
		return "success";
	}

Service:

@Override
	@Transactional
	public User save(User domain) {
		return dao.insertSelective(domain);
	}

Dao:

public User insertSelective(User user){
		myBatisMapper.insertSelective(user);
		return user;
	}

这是三个非常常见的一个MVC模式下的简单insert操作代码,现在就由这个开始我们探索Sharding、Spring和Mybatis整个执行流程的原理

2.执行流程

2.1Spring的DataSourceTransactionManager

众所周知,我们可以使用注解@Transactional非常简便的为DML操作加上事务的管理,我们的第一步也是从DataSourceTransactionManager这里开始讲起,先看看这个类的几个关键方法(篇幅问题只截取里面较为关键的代码):
(1)doBegin:

DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
	Connection con = null;
	if (!txObject.hasConnectionHolder() ||
		txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
		Connection newCon = obtainDataSource().getConnection();
		if (logger.isDebugEnabled()) {
			logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
		}
		txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
	}
	
    txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
	con = txObject.getConnectionHolder().getConnection();

	Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
	txObject.setPreviousIsolationLevel(previousIsolationLevel);

在这里,spring先会在txObject里面设置配置在这个Bean里面的DataSource,获取一个connection并放在一个ConnectionHolder里面(注意在后面spring的rollBack和commit都会使用这个connection),后面通过DataSourceUtils.prepareConnectionForTransaction这个方法会为这个Connection设置只读和事务隔离级别
(2)doCommit:

protected void doCommit(DefaultTransactionStatus status) {
	DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
	Connection con = txObject.getConnectionHolder().getConnection();
	if (status.isDebug()) {
		logger.debug("Committing JDBC transaction on Connection [" + con + "]");
	}
	try {
		con.commit();
	}
	catch (SQLException ex) {
		throw new TransactionSystemException("Could not commit JDBC transaction", ex);
	}
}

可以看到会拿放在ConnectionHolder的Connection,进行commit()
(3)doRollBack:

@Override
protected void doRollback(DefaultTransactionStatus status) {
	DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
	Connection con = txObject.getConnectionHolder().getConnection();
	if (status.isDebug()) {
		logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
	}
	try {
		con.rollback();
	}
	catch (SQLException ex) {
		throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
	}
}

可以看到是通过之前ConnectionHolder里面的connection进行rollback

执行的流程:
ShardingJdbc、Spring和mybatis结合的整个源码执行流程解析

2.2Mybatis的执行流程

首先mybatis-spring会将Mapper的interface扫描注册成MapperFactoryBean并重写了getObject()方法,在使用@Autowire注解的时候,AutowiredAnnotationBeanPostProcessor这个类会调用inject方法去注入对象

@Override
protected void inject(Object bean, @Nullable String beanName, @Nullable PropertyValues pvs) throws Throwable {
	Field field = (Field) this.member;
	Object value;
	if (this.cached) {
		value = resolvedCachedArgument(beanName, this.cachedFieldValue);
	}
	else {
		DependencyDescriptor desc = new DependencyDescriptor(field, this.required);
		desc.setContainingClass(bean.getClass());
		Set<String> autowiredBeanNames = new LinkedHashSet<>(1);
		Assert.state(beanFactory != null, "No BeanFactory available");
		TypeConverter typeConverter = beanFactory.getTypeConverter();
		try {
			value = beanFactory.resolveDependency(desc, beanName, autowiredBeanNames, typeConverter);
		}
		catch (BeansException ex) {
			throw new UnsatisfiedDependencyException(null, beanName, new InjectionPoint(field), ex);
		}

在调用resolvedCachedArgument这个方法的时候,最后会一路到DependencyDescriptor.resolveCandidate()这里,里面会有一个beanFactory.getBean()方法,在MapperFactoryBean里面,就会返回一个MapperProxy对象,获取到MapperProxy对象之后,开始真正mybatis的执行

MapperProxy

首先会调用execute方法,可以看到Mybatis里面有个SqlCommand的枚举类,用来定义不同类型的sql语句处理方式:

public Object execute(SqlSession sqlSession, Object[] args) {
    Object result;
    switch (command.getType()) {
      case INSERT: {
    	Object param = method.convertArgsToSqlCommandParam(args);
        result = rowCountResult(sqlSession.insert(command.getName(), param));
        break;
      }
      case UPDATE: {
        Object param = method.convertArgsToSqlCommandParam(args);
        result = rowCountResult(sqlSession.update(command.getName(), param));
        break;
      }

可以看到,里面实际上是用了sqlSession的方法去操作,外层调一个rowCountResult的方法进行影响行数的计算,下面看看sqlSession里面是怎么执行的

  @Override
  public int update(String statement, Object parameter) {
    try {
      dirty = true;
      MappedStatement ms = configuration.getMappedStatement(statement);
      return executor.update(ms, wrapCollection(parameter));
    } catch (Exception e) {
      throw ExceptionFactory.wrapException("Error updating database.  Cause: " + e, e);
    } finally {
      ErrorContext.instance().reset();
    }
  }

可以看到里面实际调用了executor.update()的方法执行,具体可自己查询。Mybatis分成两个Executor,一个是BaseExecutor,另一个是CacheExecutor,当第一次执行的时候,会调用BaseExecutor,如果使用了mybatis的cache,会使用CacheExecutor,一般是使用BaseExecutor执行,BaseExecutor实际上是通过SimpleExecutor执行(默认),下面来看看SimpleExecutor的执行(这里还可以使用mybatis提供的拦截器机制,具体可自己上官网查询)

@Override
public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
   Statement stmt = null;
   try {
     Configuration configuration = ms.getConfiguration();
     StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
     stmt = prepareStatement(handler, ms.getStatementLog());
     return handler.update(stmt);
   } finally {
     closeStatement(stmt);
   }
}

实际最后也是调用了原生jdbc的PrepareStatement去执行

2.3Sharding jdbc融入在这个mybatis-spring的方式

参考Sharding-jdbc官网,可以看到ShardingJdbc实际上是围绕DataSource来实现的,我们来看看ShardingDataSource和它的ShardingConnection

@Getter
public class ShardingDataSource extends AbstractDataSourceAdapter implements AutoCloseable {
    
    private final Map<String, DataSource> dataSourceMap;
    
    private final ShardingContext shardingContext;
    
    private final ShardingProperties shardingProperties;
    
    public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule) throws SQLException {
        this(dataSourceMap, shardingRule, new ConcurrentHashMap<String, Object>(), new Properties());

可以看到创建ShardingDataSource的时候会维护一个dataSourceMap,一个ShardingContext上下文,一个ShardingProperties配置类
再看看ShardingConnection和它的父类AbstractConnectionAdapter

public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOperationConnection {
    
    private final Map<String, Connection> cachedConnections = new HashMap<>();
    
    private boolean autoCommit = true;
    
    private boolean readOnly = true;
    
    private boolean closed;
    
    private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
    
    private final ForceExecuteTemplate<Connection> forceExecuteTemplate = new ForceExecuteTemplate<>();
    
    /**
     * Get database connection.
     *
     * @param dataSourceName data source name
     * @return database connection
     * @throws SQLException SQL exception
     */
    public final Connection getConnection(final String dataSourceName) throws SQLException {
        if (cachedConnections.containsKey(dataSourceName)) {
            return cachedConnections.get(dataSourceName);
        }
        DataSource dataSource = getDataSourceMap().get(dataSourceName);
        Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);
        Connection result = dataSource.getConnection();
        cachedConnections.put(dataSourceName, result);
        replayMethodsInvocation(result);
        return result;
    }

可以看到每次getConnection,都会通过一个变量cachedConnections来缓存上次获取到的connection,以便可以下次直接使用不需要重新获取
下面再看下我们最关心的commit和rollback方法

 @Override
    public final void commit() throws SQLException {
        if (TransactionType.LOCAL == TransactionTypeHolder.get()) {
            forceExecuteTemplate.execute(cachedConnections.values(), new ForceExecuteCallback<Connection>() {
            
                @Override
                public void execute(final Connection connection) throws SQLException {
                    connection.commit();
                }
            });
        } else if (TransactionType.XA == TransactionTypeHolder.get()) {
            ShardingEventBusInstance.getInstance().post(new XATransactionEvent(TransactionOperationType.COMMIT));
        }
    }
    
    @Override
    public final void rollback() throws SQLException {
        if (TransactionType.LOCAL == TransactionTypeHolder.get()) {
            forceExecuteTemplate.execute(cachedConnections.values(), new ForceExecuteCallback<Connection>() {
            
                @Override
                public void execute(final Connection connection) throws SQLException {
                    connection.rollback();
                }
            });
        } else if (TransactionType.XA == TransactionTypeHolder.get()) {
            ShardingEventBusInstance.getInstance().post(new XATransactionEvent(TransactionOperationType.ROLLBACK));
        }
    }

默认我们都是使用ShardingJdbc的LOCAL事务,所以可以看到sharding每次commit和rollback都是会对它里面缓存的所有connection进行操作,所以在多数据源的情况下,shardingJdbc能够做到分布式的弱事务

那他们是怎么结合到一起的呢?我们要记得在spring里面配置SqlSessionFactoryBean和DataSourceTransactionManager的时候,我们需要配置datasource,这时候将ShardingDataSource配置进去之后,后面Mybatis通过DataSource获取Connection,和DataSourceTransactionManager里面获取connection一样都是获取的ShardingConnection,整个执行的流程都会围绕这个ShardingConnection执行,后面ShardingJdbc实现的事务和缓存等等都封装在ShardingConnection里面,做到对外来说完全实现普通的jdbc标准

3.总结流程图

ShardingJdbc、Spring和mybatis结合的整个源码执行流程解析