基于DAO层上的分库分表、及mybatis实现
1.架构框架
1. 概述
- 框架采用反射、动态代理、泛型、注解等技术实现;主要设计模式:代理、接口回调、本地线程;本框架主要抽取了分库分表、全局事物的处理,具体业务代码不再涉及分库分表逻辑,同时可以选择性的代理事物控制。
- 分库分表在DAO层实现。不同的O/R模型需要提供各自实现以便支持本框架;现已实现Mybatis的接入。
- 全局事物在Service层进行实现,具体业务如果委托了全局事物的提交、回滚、关闭等操作,则在本层实现。只是代处理了事物,没有真正实现全局事物,个人认为没有什么意义。
- 框架对Service、DAO两层进行“代理”处理,以便进行全局事物、分库分表策略。分库分表提供操作接口,具体内容由业务实现。是否需要事物管理采用注解方式进行标注。
- 提供工具类实现两层代理的单例生成。
- 框架已经实现部分基础操作,例如增删查改,但是只针对实体对象类型进行操作。
- 对Mybatis数据源的加载进行了扩展,支持同时加载多数据源,配置文件不需要进行特殊处理。
- 为保障性能,对反射部分进行了缓存处理。
- Service层的每一个实现都对应一个代理实体,DAO层的每一个实现都对应一个代理实体。
2. 架构图
全局事物贯穿整个上下文的始终
3. 主要流程图
- 全局事物控制流程图
- 数据源事物控制
2.接口说明
dao.sharding.split.action.CallBack<<interface>> |
||
具体业务进行操作获取数据源时实现此接口,以便获取数据源 |
||
doAction(T engine) |
描述 |
具体业务实现 |
参数 |
T engine:数据源,由回调方法推入 |
|
返回值 |
Object obj:操作结果 |
dao.sharding.split.annotation.HolisticController<<Annotation>> |
全局事物注解,标注于Service层方法,对于有此标注的方法,要进行全局事物处理 |
dao.sharding.split.annotation.SplitPolicy<<Annotation>> |
分库分表注解,标注于DAO层方法,对于有此标注的方法,要进行分库分表处理 |
dao.sharding.split.transaction.annotation.Transactional<<Annotation>> |
事物注解,标注于DAO层方法,对于有此标注的方法,要进行事物控制 |
dao.sharding.split.policy.IPolicy<K><<abstract>> |
||
分库分表策略类,具体业务要实现相应业务逻辑 |
||
createKey() |
描述 |
主键生成接口 |
返回值 |
K 生成主键 |
|
getClusterId() |
描述 |
计算集群id接口 |
返回值 |
集群id |
|
getDBId() |
描述 |
计算数据库id接口 |
返回值 |
数据库id |
|
getDBIdsByClusterId(String clusterId) |
描述 |
获取某一集群下的所有数据库 |
参数 |
clusterId:集群id |
|
返回值 |
String[]:数据库id集合 |
|
getTableId() |
描述 |
计算表id接口 |
参数 |
|
|
返回值 |
表id |
|
getTableIdsByDbId(String dbId) |
描述 |
获取某一数据库下的所有表 |
参数 |
dbId:数据库id |
|
返回值 |
String[]:表id集合 |
|
getAllClusterId() |
描述 |
获取所有集群 |
参数 |
|
|
返回值 |
String[]:集群id集合 |
|
getAllDBId() |
描述 |
获取所有数据库 |
参数 |
|
|
返回值 |
String[]:数据库id集合 |
|
getAllTableId() |
描述 |
获取所有表集合 |
参数 |
|
|
返回值 |
String[]:表集合id集合 |
dao.sharding.split.aop.DAOSplitHandler<<class>> extends InvocationHandler |
||
分库分表控制器,基于DAO层拦截器实现,对每个DAO层操作都进行拦截,以实现分库分表、事物管理操作。 1.检查方法是否包含SplitPolicy注解;有:调用相应方法获取数据源,否:使用默认数据源 2.检查方法是否包含Transactional注解;有:需要框架提供事物处理,否:终端用户已经自实现了事物处理。 3.检查上下文环境,如果处于全局事物当中,将事物交由全局事物控制器 |
||
invoke(Object proxy, Method method, Object[] args) |
描述 |
拦截DAO层方法调用,加入分库分表于事物控制;继承方法 |
参数 |
Object proxy:代理对象 Method method:调用方法 Object[] args:方法参数 |
|
返回值 |
|
dao.sharding.split.aop.ServiceSplitHandler<<class>> extends InvocationHandler |
||
全局事物控制器,基于Service层拦截器实现,对每个Service层都进行拦截,加入全局事物控制。 1.检查方法是否包含HolisticController注解,有:此方法需要加入全局事物控制,无:在当前上下文环境中是否为嵌套方法,如果外层调用已经存在全局事物控制,则继承全局事物控制,否则放弃嵌套。 |
||
invoke(Object proxy, Method method, Object[] args) |
描述 |
拦截Service方法,进行全局事物事物控制;继承方法 |
参数 |
Object proxy:代理对象 Method method:调用方法 Object[] args:方法参数 |
|
返回值 |
|
dao.sharding.split.engine.EngineContext<<abstract>> |
||
事物上下文管理类,基于当前线程对象管理全局事物及其数据源 |
||
ThreadLocal<EngineContextStatus> LocalContextStatus |
描述 |
当前线程的上下文,存储数据源的链表队列,支持栈操作 |
push(T engine) |
描述 |
在某方法执行前,将用到的数据源压入上下文栈中 |
参数 |
T engine:数据源对象 |
|
返回值 |
|
|
pop(T engine) |
描述 |
在当前方法执行时,弹出上下文中的数据源 |
参数 |
|
|
返回值 |
T engine:当前方法数据源 |
|
setHolistic(boolean isHolistic) |
描述 |
将当前Service执行方法全局事物状态压入。压入条件:需要全局事物控制的;外层Service调用已经有压入的非标志方法 |
参数 |
boolean isHolistic:是否需要全局事物控制 |
|
返回值 |
|
|
getHolisticNestedLevel() |
描述 |
返回当前Service方法所在全局事物嵌套中的层级。 |
参数 |
|
|
返回值 |
当前层级,如果不存在全局事物上下文,则返回0。 |
|
removeHolistic() |
描述 |
将当前Service方法从全局事务事中弹出 |
参数 |
|
|
返回值 |
int:在堆栈当中的层级 |
|
rollback() |
描述 |
回滚全局事物 |
参数 |
|
|
返回值 |
|
|
commit() |
描述 |
提交全部事物 |
参数 |
|
|
返回值 |
|
|
close() |
描述 |
Close全局事物的所有数据源 |
参数 |
|
|
返回值 |
|
|
clearContext() |
描述 |
清除全局事物上下文 |
参数 |
|
|
返回值 |
|
|
getExistEngine(String index) |
描述 |
检索当前上下文中已经存在、并可使用的数据源,具体实现类实现 |
参数 |
String index:数据源标识 |
|
返回值 |
T:对应数据源 |
|
mergeEngine() |
描述 |
合并当前上下文中同志的数据源 |
参数 |
|
|
返回值 |
|
dao.sharding.split.engine.EngineContext.EngineContextStatus<<class>> |
||
全局事物上下文环境载体类,用于存储当前上下文中信息 |
||
LinkedList<Boolean> holistic |
描述 |
存储全局事物上下文环境 |
LinkedList<T> queue |
描述 |
存储上下文环境中数据源 |
dao.sharding.split.engine.EngineStatus<<abstract>> |
||
数据源状态容器,包含数据源的当前状态信息 |
||
int state |
描述 |
数据源的当前状态 |
int needX |
描述 |
数据源当前的事物状态 |
int count |
描述 |
数据源被调用次数 |
String index |
描述 |
数据源唯一标识 |
statusRollback() |
描述 |
回滚当前数据源事物 |
参数 |
|
|
返回值 |
|
|
statusCommit() |
描述 |
提交当前事物 |
参数 |
|
|
返回值 |
|
|
statusClose() |
描述 |
Close当前数据源 |
参数 |
|
|
返回值 |
|
|
isValid() |
描述 |
当前数据源是否可以重用 |
参数 |
|
|
返回值 |
|
|
getActualEngine() |
描述 |
获取数据源真正实体 |
参数 |
|
|
返回值 |
T :数据源真正实体 |
dao.sharding.split.engine.EngineSupport<<interface>> |
||
实际数据源帮助类 |
||
getEngine(String... params) |
描述 |
根据数据源返回数据源集合 |
参数 |
|
|
返回值 |
Map<String, T>:数据源集合 |
|
getDefaultEngineIndex() |
描述 |
获取默认数据源标识 |
参数 |
|
|
返回值 |
String :默认数据源标识 |
|
getDefaultEngine() |
描述 |
获取默认的数据源 |
参数 |
|
|
返回值 |
Map<String,T> :数据源 |
|
getAllEngines() |
描述 |
当前数据源是否可以重用 |
参数 |
|
|
返回值 |
Map<String,T>:获取所有数据源 |
|
getAllEnginesIndexs() |
描述 |
获取数据源真正实体 |
参数 |
|
|
返回值 |
String[] :获取所有数据源的标识 |
dao.sharding.dao.impl.mybatis.action.MybatisCallBack<<interface>> Implements Callback |
Mybatis实现动作方法 |
dao.sharding.dao.impl.mybatis.engine.MultipleSqlSessionFactoryBuilder<<class>> extends SqlSessionFactoryBuilder |
||
扩展mybatis工厂类,实现多数据源同时加载 |
||
buildMulti(Reader reader) |
描述 |
同时加载多数据源 |
参数 |
|
|
返回值 |
Map<String,SqlSessionFactory>:全部数据源 |
dao.sharding.dao.impl.mybatis.engine.MybatisEngineContext<<class>> extends EngineContext |
Mybatis 实现分库分表EngineContext |
dao.sharding.dao.impl.mybatis.engine.MybatisEngineStatus<<class>> extends EngineStatus |
Mybatis 实现分库分表EngineStatus |
dao.sharding.dao.impl.mybatis.engine.MybatisEngineSupport<<class>> extends EngineSupport |
Mybatis 实现分库分表EngineSupport |
dao.sharding.dao.impl.mybatis.support.DAOLoader |
||
加载DAO层类的代理实例,并实现单例模式 |
||
getSingleTon(Class<?> clazz) |
描述 |
扫描DAO层,加载所有DAO层 |
参数 |
|
|
返回值 |
Object:代理实体 |
dao.sharding.service.support.ServiceLoader |
||
加载Service层类的代理实例,并实现单例模式 |
||
getSingleTon(Class<?> clazz) |
描述 |
扫描DAO层,加载所有Service层代理 |
参数 |
|
|
返回值 |
Object:代理实体 |
3.事物处理策略
- 对实际的引擎进行封装代理,拦截用户的实际操作,拦截用户的Rollback、Commit、Close操作;并对异常进行处理,有异常的情况并且标识了事物标签的需要自动进行回退操作,并且将该异常继续抛出。当用户手动进行了以上操作后,此引擎在当前线程中处于不可再用状态,并标识为上述已经执行的状态。
- 对于在service层未进行全局控制标识的,在DAO层将每次操作后的引擎在DAO层直接进行Rollback、Commit、Close操作。
- 对于在service层进行全局控制标识的,在DAO层不做处理,交由service服务代理处理。
- Service层进行全局控制标识的,如果内嵌了其他service层操作,内嵌的其他方法不做全局控制,交由最外层Service方法进行处理。
- 对于insert、update、delete操作,自动添加事物处理。
- 暂时不支持内部嵌套的全局事物控制。
4.使用说明
- 实体bean继承Ipolicy接口,实现里里面的分库分表策略,例如:
public class TbReceiverAddress extends IPolicy<Long>
实现以下接口
/**
* generate primary key
* @return
*/
public abstract K createKey();
/**
* get cluster id
* @param id
* @return
*/
public abstract String getClusterId();
/**
* get database id
* @param id
* @return
*/
public abstract String getDBId();
/**
* get databases id by cluster id
* @param clusterId
* @return
*/
public abstract String[] getDBIdsByClusterId(String clusterId);
/**
* get table id
* @param id
* @return
*/
public abstract String getTableId();
/**
* get tables id by database id
* @return
*/
public abstract String[] getTableIdsByDbId(String id);
/**
* get all clusters id
* @return
*/
public abstract String[] getAllClusterId();
/**
* get all databases id
* @return
*/
public abstract String[] getAllDBId();
/**
* get all tables id
* @return
*/
Public abstract String[] getAllTableId();
- DAO层实现必须继BaseDAOImpl类且实现相应动作接口定义,以便实现动态代理功能,BaseDAOImpl里面包含了上下文环境中的数据源调用:
/**
* executor
* @throws Exception
*/
public Object doAction(MybatisCallBack callback) throws Exception{
//get the SqlSession object according to the context
MybatisEngineStatus session = MybatisEngineContext.singleTon().pop();
if(session==null){
throw new Exception("there is no SqlSession in the context, should push a SqlSession to the context first");
}
Object objRes = callback.doAction(session);
//check the session state , whether end user has commit or close the session
session.setState(EngineStatus.ENGINE_RUNNING);
//TODO later need to implements the function of multiple thread to get data from all data sources
return objRes;
}
自定义DAO层方法时如果需要全局控制时,业务实现为内部类调用,例如下:
@SplitPolicy(type=OperationType.INSERT)
@Transactional()
@SuppressWarnings("unchecked")
public K insert(T object) throws Exception{
final T obj = object;
return (K) this.doAction(new MybatisCallBack(){
public Object doAction(SqlSession session) {
int insert = session.insert(clazz.getName()+".insert", obj);
return insert;
}
});
}
如果需要分库分表策略,请标注注解:
@SplitPolicy(type=OperationType.INSERT)
如果需要事物控制,请标注注解:
@Transactional()
配置文件中配置DAO层实现的包路径,控件提供了扫描给定文件夹,并自己动加载DAO层动态代理实例功能
l dao.base.package=dao.sharding.dao.impl.mybatis
ldao.sharding.dao.impl.mybatis.support.DAOLoader.getSingleTon(Class<?> clazz)
- Service层实现必须实现相应接口定义以便实现动态代理,对于需要实现全局控制的接口,请标注@HolisticController,例:
public class AddressServiceImpl implements IAddressService {
private IAddressDAO addressDAO = (IAddressDAO) DAOLoader.getSingleTon(IAddressDAO.class);
@HolisticController
public Integer addAddress(TbReceiverAddress address) {
try{
Integer intId = this.addressDAO.insert(address);
return intId;
}catch(Exception e){
e.printStackTrace();
}
return null;
}
配置文件中配置Service层实现的包路径,控件提供了扫描给定文件夹,并自己动加载Service层动态代理实例功能
l service.base.package=dao.sharding.service.impl
ldao.sharding.service.support.ServiceLoader.getSingleTon(Class<?> clazz
5.后续优化
- 完善日志以及异常处理机制。
- 抽取跨表查询服务的多路归并方法。