分库分表思路
总体思路和切入点:
1.在spring数据访问封装层通过动态代理无侵入的扩展代码加入分库分表策略。
(1)分库:通过动态代理扩展SqlSession的代码并传入分库参数来选择sqlSessionTemplate的数据源的方式实现分库策略
public abstract class SqlSessionDaoSupport implements InitializingBean {
private SqlSessionFactoryBean sqlSessionFactoryBean;
private Map<DataSource, SqlSessionTemplate> dataSourceMap;
private SqlSession sqlSession;
{
//通过动态代理扩展SqlSession的代码并传入分库参数来选择sqlSessionTemplate的数据源的方式实现分库策略
sqlSession = (SqlSession) Proxy.newProxyInstance(SqlSessionDaoSupport.class.getClassLoader(),
new Class[] { SqlSession.class }, new SessionHandler());
}
@Autowired(required = false)
public final void setSqlSessionFactory(SqlSessionFactoryBean sqlSessionFactoryBean) {
this.sqlSessionFactoryBean = sqlSessionFactoryBean;
}
public final SqlSession getSqlSession() {
return sqlSession;
}
@Override
public final void afterPropertiesSet() throws Exception {
sqlSessionFactoryBean.afterPropertiesSet();
//
dataSourceMap = new LinkedHashMap<DataSource, SqlSessionTemplate>();
dataSourceMap.put(sqlSessionFactoryBean.getMainDataSource(),
new SqlSessionTemplate(sqlSessionFactoryBean.getMainSqlSessionFactory()));
Map<String, DataSource> shardDataSources = sqlSessionFactoryBean.getShardDataSources();
if (shardDataSources != null) {
for (Entry<String, DataSource> entry : shardDataSources.entrySet()) {
SqlSessionFactory sqlSessionFactory = sqlSessionFactoryBean.getShardSqlSessionFactory().get(
entry.getKey());
dataSourceMap.put(entry.getValue(), new SqlSessionTemplate(sqlSessionFactory));
}
}
}
private class SessionHandler implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
try {
// 默认DataSource为MainDataSource
DataSource targetDS = sqlSessionFactoryBean.getMainDataSource();
//
if (args == null || args.length == 0) {
// 准备事务
prepareTx(targetDS);
//
return method.invoke(dataSourceMap.get(sqlSessionFactoryBean.getMainDataSource()), args);
}
if (!(args[0] instanceof String)) {
// 准备事务
prepareTx(targetDS);
//
return method.invoke(dataSourceMap.get(sqlSessionFactoryBean.getMainDataSource()), args);
}
ShardParam shardParam = (args.length > 1 && args[1] instanceof ShardParam) ? (ShardParam) args[1]
: null;
if (shardParam == null) {
// 准备事务
prepareTx(targetDS);
//
return method.invoke(dataSourceMap.get(sqlSessionFactoryBean.getMainDataSource()), args);
} else {
args[1] = shardParam.getParams();
}
//
String statement;
String shardStrategyName;
ShardStrategy shardStrategy;
statement = (String) args[0];
shardStrategyName = shardParam.getName();
shardStrategy = sqlSessionFactoryBean.getShardStrategyMap().get(shardStrategyName);
if (shardStrategy == null) {
shardStrategy = NoShardStrategy.INSTANCE;
}
Configuration configuration = sqlSessionFactoryBean.getMainSqlSessionFactory().getConfiguration();
MappedStatement mappedStatement = configuration.getMappedStatement(statement);
BoundSql boundSql = mappedStatement.getBoundSql(wrapCollection(shardParam.getParams()));
shardStrategy.setMainDataSource(sqlSessionFactoryBean.getMainDataSource());
shardStrategy.setShardDataSources(sqlSessionFactoryBean.getShardDataSources());
shardStrategy.setShardParam(shardParam);
shardStrategy.setSql(boundSql.getSql());
//
StrategyHolder.setShardStrategy(shardStrategy);
// 重新指定目标DataSource
targetDS = shardStrategy.getTargetDataSource();
SqlSessionTemplate sqlSessionTemplate = null;
if (targetDS == null || (sqlSessionTemplate = dataSourceMap.get(targetDS)) == null) {
targetDS = sqlSessionFactoryBean.getMainDataSource();
sqlSessionTemplate = dataSourceMap.get(targetDS);
}
// 准备事务
prepareTx(targetDS);
return method.invoke(sqlSessionTemplate, args);
} finally {
StrategyHolder.removeShardStrategy();
}
}
/**
* 、 准备事务
*
* @param targetDS
*/
private void prepareTx(DataSource targetDS) {
//
TransactionHolder.setDataSource(targetDS);
// for transaction
TransactionInfoWrap txInfo = TransactionHolder.getTransactionInfo();
if (txInfo != null) {
TransactionAttribute attr = txInfo.getTransactionAttribute();
if (attr != null) {
createTxIfAbsent(targetDS, txInfo);
}
}
}
/**
* 如果不存在则创建事务
*
* @param targetDS
* @param txInfo
*/
private void createTxIfAbsent(DataSource targetDS, TransactionInfoWrap txInfo) {
Map<DataSource, LinkedList<TransactionInfoWrap>> txTree = TransactionHolder.getTxTree();
if (txTree == null || !txTree.containsKey(targetDS)) {
createTx(targetDS, txInfo);
}
}
private void createTx(DataSource targetDS, TransactionInfoWrap txInfo) {
TransactionStatus txStatus = txInfo.getTransactionManager()
.getTransaction(txInfo.getTransactionAttribute());
// txStatus = new TransactionStatusWrap((DefaultTransactionStatus)
// txStatus);
TransactionHolder.addStatusDS(txStatus, targetDS);
//
TransactionInfoWrap txInfoCopy = txInfo.newCopy();
txInfoCopy.newTransactionStatus(txStatus);
//
TransactionHolder.addTxInfo2Tree(targetDS, txInfoCopy);
}
private Object wrapCollection(final Object object) {
if (object instanceof List) {
return new HashMap<String, Object>() {
private static final long serialVersionUID = -2533602760878803345L;
{
put("list", object);
}
};
} else if (object != null && object.getClass().isArray()) {
return new HashMap<String, Object>() {
private static final long serialVersionUID = 8371167260656531195L;
{
put("array", object);
}
};
}
return object;
}
}
}
(2)分表:通过自定义Configuration拦截器并传入分表参数来替换BoundSql的sql的方式实现分表策略
public class SqlSessionFactoryBean implements ApplicationContextAware, MultiDataSourceSupport {
private final Logger logger = LoggerFactory.getLogger(getClass());
private ApplicationContext applicationContext;
private DataSource mainDataSource;
private SqlSessionFactory mainSqlSessionFactory;
private Map<String, DataSource> shardDataSources;
private Map<String, SqlSessionFactory> shardSqlSessionFactory;
private List<DataSource> shardDataSourceList;
private Resource[] mapperLocations;
private Map<String, ShardStrategy> shardStrategyMap = new HashMap<String, ShardStrategy>();
private Map<String, Class<?>> shardStrategyConfig = new HashMap<String, Class<?>>();
private SqlConverter sqlConverter = new DefaultSqlConverter();
public DataSource getMainDataSource() {
return mainDataSource;
}
public void setMainDataSource(DataSource mainDataSource) {
if (mainDataSource instanceof TransactionAwareDataSourceProxy) {
// If we got a TransactionAwareDataSourceProxy, we need to perform
// transactions for its underlying target DataSource, else data
// access code won't see properly exposed transactions (i.e.
// transactions for the target DataSource).
this.mainDataSource = ((TransactionAwareDataSourceProxy) mainDataSource).getTargetDataSource();
} else {
this.mainDataSource = mainDataSource;
}
}
public void setShardDataSourceList(List<DataSource> shardDataSourceList) {
this.shardDataSourceList = shardDataSourceList;
}
public Map<String, DataSource> getShardDataSources() {
return shardDataSources;
}
public void setMapperLocations(Resource[] mapperLocations) {
this.mapperLocations = mapperLocations;
}
public void setShardStrategy(Map<String, Class<?>> shardStrategyMap) {
this.shardStrategyConfig = shardStrategyMap;
}
public SqlSessionFactory getMainSqlSessionFactory() {
return mainSqlSessionFactory;
}
public Map<String, SqlSessionFactory> getShardSqlSessionFactory() {
return shardSqlSessionFactory;
}
public Map<String, ShardStrategy> getShardStrategyMap() {
return shardStrategyMap;
}
public void afterPropertiesSet() throws Exception {
if (mainDataSource == null && (shardDataSourceList == null || shardDataSourceList.size() == 0)) {
throw new RuntimeException(
" Property 'mainDataSource' and property 'shardDataSourceList' can not be null together! ");
}
if (shardDataSourceList != null && shardDataSourceList.size() > 0) {
shardDataSources = new LinkedHashMap<String, DataSource>();
Map<String, DataSource> dataSourceMap = applicationContext.getBeansOfType(DataSource.class);
for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
for (int i = 0; i < shardDataSourceList.size(); i++) {
DataSource ds = shardDataSourceList.get(i);
if (entry.getValue() == ds) {
DataSource dataSource = entry.getValue();
if (dataSource instanceof TransactionAwareDataSourceProxy) {
dataSource = ((TransactionAwareDataSourceProxy) dataSource).getTargetDataSource();
}
shardDataSources.put(entry.getKey(), dataSource);
}
}
}
}
if (mainDataSource == null) {
if (shardDataSourceList.get(0) instanceof TransactionAwareDataSourceProxy) {
this.mainDataSource = ((TransactionAwareDataSourceProxy) shardDataSourceList.get(0))
.getTargetDataSource();
} else {
mainDataSource = shardDataSources.get(0);
}
}
this.mainSqlSessionFactory = buildSqlSessionFactory(getMainDataSource());
if (getShardDataSources() != null && getShardDataSources().size() > 0) {
shardSqlSessionFactory = new LinkedHashMap<String, SqlSessionFactory>(getShardDataSources().size());
for (Entry<String, DataSource> entry : getShardDataSources().entrySet()) {
shardSqlSessionFactory.put(entry.getKey(), buildSqlSessionFactory(entry.getValue()));
}
}
//
if (shardStrategyConfig != null) {
shardStrategyMap = new HashMap<String, ShardStrategy>();
for (Map.Entry<String, Class<?>> entry : shardStrategyConfig.entrySet()) {
Class<?> clazz = entry.getValue();
if (!ShardStrategy.class.isAssignableFrom(clazz)) {
throw new IllegalArgumentException("class " + clazz.getName()
+ " is illegal, subclass of ShardStrategy is required.");
}
try {
shardStrategyMap.put(entry.getKey(), (ShardStrategy) (entry.getValue().newInstance()));
} catch (Exception e) {
throw new RuntimeException("new instance for class " + clazz.getName() + " failed, error:"
+ e.getMessage());
}
}
//
shardStrategyConfig = null;
}
}
private SqlSessionFactory buildSqlSessionFactory(DataSource dataSource) throws IOException {
ShardPlugin plugin = new ShardPlugin();
plugin.setSqlConverter(sqlConverter);
Configuration configuration = null;
SpringManagedTransactionFactory transactionFactory = null;
//
configuration = new Configuration();
configuration.addInterceptor(plugin);//通过自定义Configuration拦截器并传入分表参数来替换BoundSql的sql的方式实现分表策略
//
transactionFactory = new SpringManagedTransactionFactory(dataSource);
Environment environment = new Environment(SqlSessionFactoryBean.class.getSimpleName(), transactionFactory,
dataSource);
configuration.setEnvironment(environment);
if (!ObjectUtils.isEmpty(this.mapperLocations)) {
for (Resource mapperLocation : this.mapperLocations) {
if (mapperLocation == null) {
continue;
}
// this block is a workaround for issue
// http://code.google.com/p/mybatis/issues/detail?id=235
// when running MyBatis 3.0.4. But not always works.
// Not needed in 3.0.5 and above.
String path;
if (mapperLocation instanceof ClassPathResource) {
path = ((ClassPathResource) mapperLocation).getPath();
} else {
path = mapperLocation.toString();
}
try {
XMLMapperBuilder xmlMapperBuilder = new XMLMapperBuilder(mapperLocation.getInputStream(),
configuration, path, configuration.getSqlFragments());
xmlMapperBuilder.parse();
} catch (Exception e) {
throw new NestedIOException("Failed to parse mapping resource: '" + mapperLocation + "'", e);
} finally {
ErrorContext.instance().reset();
}
if (this.logger.isDebugEnabled()) {
this.logger.debug("Parsed mapper file: '" + mapperLocation + "'");
}
}
} else {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Property 'mapperLocations' was not specified or no matching resources found");
}
}
return new SqlSessionFactoryBuilder().build(configuration);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
3.为了便于理清思路,上传一张mybatis执行SQL的时序图: