Jdbc怎么实现分布式事务数据源动态切换

本篇内容介绍了“Jdbc怎么实现分布式事务数据源动态切换”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

一:依赖的jar包  Maven配置

<dependency>

  1.             <groupId>com.atomikos</groupId>

  2.             <artifactId>transactions</artifactId>

  3.             <version>4.0.4</version>

  4.         </dependency>

  5.         <dependency>

  6.             <groupId>com.atomikos</groupId>

  7.             <artifactId>transactions-api</artifactId>

  8.             <version>4.0.4</version>

  9.         </dependency>

  10.         <dependency>

  11.             <groupId>com.atomikos</groupId>

  12.             <artifactId>atomikos-util</artifactId>

  13.             <version>4.0.4</version>

  14.         </dependency>

  15.         <dependency>

  16.             <groupId>com.atomikos</groupId>

  17.             <artifactId>transactions-jdbc-deprecated</artifactId>

  18.             <version>3.8.0</version>

  19.         </dependency>

  20.         <dependency>

  21.             <groupId>com.atomikos</groupId>

  22.             <artifactId>transactions-jta</artifactId>

  23.             <version>4.0.4</version>

  24.         </dependency>

  25.         <dependency>

  26.             <groupId>com.atomikos</groupId>

  27.             <artifactId>transactions-jdbc</artifactId>

  28.             <version>4.0.4</version>

  29.         </dependency>


  30.         <dependency>

  31.             <groupId>cglib</groupId>

  32.             <artifactId>cglib-nodep</artifactId>

  33.             <version>3.2.5</version>

  34.         </dependency>


  35.         <dependency>

  36.             <groupId>javax.transaction</groupId>

  37.             <artifactId>jta</artifactId>

  38.             <version>1.1</version>

  39.         </dependency>

二:配置

dataSource.properties

点击(此处)折叠或打开

  1. workDesk.jdbc.driverclass=com.mysql.jdbc.Driver

  2. workDesk.jdbc.url=jdbc:mysql://10.243.3.18:3306/system?userUnicode=true&characterEncoding=UTF-8

  3. workDesk.jdbc.username=root

  4. workDesk.jdbc.password=$Fortune2015

  5. workDesk.jdbc.poolsize.max=3

  6. workDesk.jdbc.poolsize.min=3

  7. workDesk.jdbc.poolsize.initial=2

  8. workDesk.jdbc.idletime.max=25000

  9. workDesk.jdbc.idleConnectionTestPeriod=18000


  10. #-------workDesk jdbc--------

  11. workDesk.read.jdbc.driverclass=com.mysql.jdbc.Driver

  12. workDesk.read.jdbc.url=jdbc:mysql://112.74.53.213:3306/gmc?userUnicode=true&characterEncoding=UTF-8

  13. workDesk.read.jdbc.username=root

  14. workDesk.read.jdbc.password=Wanmide@123

  15. workDesk.read.jdbc.poolsize.max=3

  16. workDesk.read.jdbc.poolsize.min=3

  17. workDesk.read.jdbc.poolsize.initial=2

  18. workDesk.read.jdbc.idletime.max=25000

  19. workDesk.read.jdbc.idleConnectionTestPeriod=18000


  20. jdbc.xaDataSourceClassName=com.mysql.jdbc.jdbc2.optional.MysqlXADataSource

transactions.properties


点击(此处)折叠或打开

  1. # SAMPLE PROPERTIES FILE FOR THE TRANSACTION SERVICE

  2. # THIS FILE ILLUSTRATES THE DIFFERENT SETTINGS FOR THE TRANSACTION MANAGER

  3. # UNCOMMENT THE ASSIGNMENTS TO OVERRIDE DEFAULT VALUES;


  4. # Required: factory implementation class of the transaction core.

  5. # NOTE: there is no default for this, so it MUST be

  6. #

  7. com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory


  8.     

  9. # Set base name of file where messages are output

  10. # (also known as the 'console file').

  11. #

  12. # com.atomikos.icatch.console_file_name = tm.out


  13. # Size limit (in bytes) for the console file;

  14. # negative means unlimited.

  15. #

  16. # com.atomikos.icatch.console_file_limit=-1


  17. # For size-limited console files, this option

  18. # specifies a number of rotating files to

  19. # maintain.

  20. #

  21. # com.atomikos.icatch.console_file_count=1


  22. # Set the number of log writes between checkpoints

  23. #

  24. # com.atomikos.icatch.checkpoint_interval=500


  25. # Set output directory where console file and other files are to be put

  26. # make sure this directory

  27. #

  28. # com.atomikos.icatch.output_dir = ./


  29. # Set directory of log files; make sure this directory

  30. #

  31.  com.atomikos.icatch.log_base_dir = ./


  32. # Set base name of log file

  33. # this name will be  used as the first part of

  34. # the system-generated log file name

  35. #

  36. # com.atomikos.icatch.log_base_name = tmlog


  37. # Set the max number of active local transactions

  38. # or -1 for unlimited.

  39. #

  40. # com.atomikos.icatch.max_actives = 50


  41. # Set the default timeout (in milliseconds) for local transactions

  42. #

  43. # com.atomikos.icatch.default_jta_timeout = 10000


  44. # Set the max timeout (in milliseconds) for local transactions

  45. #

  46. # com.atomikos.icatch.max_timeout = 300000


  47. # The globally unique name of this transaction manager process

  48. # override this value with a globally unique name

  49. #

  50. # com.atomikos.icatch.tm_unique_name = tm


  51. # Do we want to use parallel subtransactions? JTA

点击(此处)折叠或打开

  1. /**

  2.      * Atomikos 数据源A

  3.      * @return

  4.      */

  5.     @Bean(name="dataSourceA",initMethod="init",destroyMethod="close")

  6.     public AtomikosDataSourceBean dataSourceA()

  7.     {

  8.         AtomikosDataSourceBean dataSourceA=new AtomikosDataSourceBean();

  9.         dataSourceA.setUniqueResourceName("dataSourceA");

  10.         dataSourceA.setXaDataSourceClassName(xaDataSourceClassName);

  11.         Properties xaProperties = new Properties();

  12.         xaProperties.put("user", user);

  13.         xaProperties.put("password", password);

  14.         xaProperties.put("url", jdbcUrl);

  15.         xaProperties.put("pinGlobalTxToPhysicalConnection", true);

  16.         dataSourceA.setXaProperties(xaProperties);

  17.         dataSourceA.setMaxPoolSize(maxPoolSize);

  18.         dataSourceA.setMinPoolSize(minPoolSize);

  19.         dataSourceA.setMaxIdleTime(maxIdleTime);

  20.         dataSourceA.setTestQuery("SELECT 1");

  21.         return dataSourceA;

  22.     }

  23.     

  24.     

  25.     /**

  26.      * Atomikos 数据源A

  27.      * @return

  28.      */

  29.     @Bean(name="dataSourceB",initMethod="init",destroyMethod="close")

  30.     public AtomikosDataSourceBean dataSourceB()

  31.     {

  32.         AtomikosDataSourceBean dataSourceA=new AtomikosDataSourceBean();

  33.         dataSourceA.setUniqueResourceName("dataSourceB");

  34.         dataSourceA.setXaDataSourceClassName(xaDataSourceClassName);

  35.         Properties xaProperties = new Properties();

  36.         xaProperties.put("user", readUser);

  37.         xaProperties.put("password", readPassword);

  38.         xaProperties.put("url", readJdbcUrl);

  39.         xaProperties.put("pinGlobalTxToPhysicalConnection", true);

  40.         dataSourceA.setXaProperties(xaProperties);

  41.         dataSourceA.setMaxPoolSize(readMaxPoolSize);

  42.         dataSourceA.setMinPoolSize(readMinPoolSize);

  43.         dataSourceA.setMaxIdleTime(readMaxIdleTime);

  44.         dataSourceA.setTestQuery("SELECT 1");

  45.         return dataSourceA;

  46.     }

点击(此处)折叠或打开

  1. @Configuration

  2. public class DynamicTransactionManagerElConfig {


  3.     // @Autowired

  4.     // @Qualifier("platformTomcat")

  5.     // private DataSource platformTomcat;

  6.     //

  7.     // @Autowired

  8.     // @Qualifier("platformReadTomcat")

  9.     // private DataSource platformReadTomcat;


  10.     @Autowired

  11.     @Qualifier("dataSourceA")

  12.     private DataSource dataSourceA;


  13.     @Autowired

  14.     @Qualifier("dataSourceB")

  15.     private DataSource dataSourceB;


  16.     @Bean(name = "dataSource")

  17.     public DynamicDataSource dataSource() {

  18.         DynamicDataSource dataSource = new DynamicDataSource();

  19.         Map<Object, Object> targetDataSources = new HashMap<>();

  20.         targetDataSources.put("master", dataSourceA);

  21.         targetDataSources.put("slave", dataSourceB);

  22.         dataSource.setTargetDataSources(targetDataSources);

  23.         dataSource.setDefaultTargetDataSource(dataSourceA);

  24.         return dataSource;

  25.     }


  26.     @Bean(name = "jdbcTemplate")

  27.     public JdbcTemplate jdbcTemplate(DynamicDataSource dataSource) {

  28.         JdbcTemplate jdbcTemplate = new JdbcTemplate();

  29.         jdbcTemplate.setDataSource(dataSource);

  30.         return jdbcTemplate;

  31.     }


  32.     @Bean(name = "jdbcReadTemplate")

  33.     public JdbcTemplate jdbcReadTemplate(DynamicDataSource dataSource) {

  34.         JdbcTemplate jdbcReadTemplate = new JdbcTemplate();

  35.         jdbcReadTemplate.setDataSource(dataSource);

  36.         return jdbcReadTemplate;

  37.     }


  38.     @Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")

  39.     public UserTransactionManager atomikosTransactionManager() {

  40.         UserTransactionManager atomikosTransactionManager = new UserTransactionManager();

  41.         atomikosTransactionManager.setForceShutdown(true);

  42.         return atomikosTransactionManager;

  43.     }


  44.     @Bean(name = "atomikosUserTransaction")

  45.     public UserTransactionImp atomikosUserTransaction() {

  46.         UserTransactionImp atomikosUserTransaction = new UserTransactionImp();

  47.         try {

  48.             atomikosUserTransaction.setTransactionTimeout(300);

  49.         }

  50.         catch (SystemException e) {

  51.             e.printStackTrace();

  52.         }

  53.         return atomikosUserTransaction;

  54.     }


  55.     // @Bean(name = "transactionManager")

  56.     // public DataSourceTransactionManager transactionManager(DynamicDataSource

  57.     // dataSource) {

  58.     // DataSourceTransactionManager transactionManager = new

  59.     // DataSourceTransactionManager();

  60.     // transactionManager.setDataSource(dataSource);

  61.     // return transactionManager;

  62.     // }


  63.     @Bean(name = "transactionManager")

  64.     public JtaTransactionManager transactionManager(UserTransactionManager atomikosTransactionManager,

  65.             UserTransactionImp atomikosUserTransaction) {

  66.         JtaTransactionManager transactionManager = new JtaTransactionManager();

  67.         transactionManager.setTransactionManager(atomikosTransactionManager);

  68.         transactionManager.setUserTransaction(atomikosUserTransaction);

  69.         transactionManager.setAllowCustomIsolationLevels(true);

  70.         return transactionManager;

  71.     }


  72. }

三:AOP 注解方式数据源动态切换


点击(此处)折叠或打开

  1. @Retention(RetentionPolicy.RUNTIME)

  2. @Target(ElementType.METHOD)

  3. @Documented

  4. public @interface DataSource {

  5.     String value();

  6. }

点击(此处)折叠或打开

  1. public class DynamicDataSource extends AbstractRoutingDataSource {


  2.     @Override

  3.     protected Object determineCurrentLookupKey() {

  4.         return DataSourceContextHolder.getDataSource();

  5.     }


  6. }

点击(此处)折叠或打开

  1. public class DataSourceContextHolder {


  2.     private static final ThreadLocal<String> contextHolder = new ThreadLocal<String>();


  3.     public static void setDataSource(String dataSource) {

  4.         contextHolder.set(dataSource);

  5.     }


  6.     public static String getDataSource() {

  7.         return contextHolder.get();

  8.     }


  9.     public static void removeDataSource() {

  10.         contextHolder.remove();

  11.     }


  12. }

点击(此处)折叠或打开

  1. @Aspect

  2. @Order(1)

  3. @Component

  4. public class DataSourceAspect {


  5.     @Pointcut("@annotation(com.gemdale.ghome.business.async.deal.center.demo.datasource.DataSource)")

  6.     public void dataSourcePointCut() {

  7.     };


  8.     @Before("dataSourcePointCut()")

  9.     public void before(JoinPoint joinPoint) {


  10.         System.out.println("=============dataSourcePointCut:before=============");

  11.         Object target = joinPoint.getTarget();

  12.         String method = joinPoint.getSignature().getName();


  13.         // Class[] classz = target.getClass().getInterfaces();

  14.         Class<?> classz = target.getClass();

  15.         Class<?>[] parameterTypes = ((MethodSignature) joinPoint.getSignature()).getMethod().getParameterTypes();


  16.         try {

  17.             // Method m = classz[0].getMethod(method, parameterTypes);

  18.             Method m = classz.getMethod(method, parameterTypes);

  19.             if (null != m && m.isAnnotationPresent(DataSource.class)) {

  20.                 DataSource dataSource = m.getAnnotation(DataSource.class);

  21.                 DataSourceContextHolder.setDataSource(dataSource.value());


  22.                 System.out.println("=============dataSource:" + dataSource.value());

  23.             }

  24.         }

  25.         catch (Exception e) {

  26.             e.printStackTrace();

  27.         }

  28.     }


  29. }

四:数据源动态切换实例

点击(此处)折叠或打开

  1. @Repository("gmcSmsInfoDaoImpl")

  2. public class GmcSmsInfoDaoImpl extends BaseDaoSupport implements GmcSmsInfoDAO{


  3.     /* (non-Javadoc)

  4.      * @see com.gemdale.ghome.business.async.deal.center.demo.GmcSmsInfoDAO#addMaster(com.gemdale.ghome.business.async.deal.center.demo.GmcSmsInfo)

  5.      */

  6.     @Override

  7.     @DataSource("master")

  8.     public Integer addMaster(GmcSmsInfo smsInfo) throws FrameworkDAOException {

  9.         return save(smsInfo);

  10.     }


  11.     /* (non-Javadoc)

  12.      * @see com.gemdale.ghome.business.async.deal.center.demo.GmcSmsInfoDAO#addSlave(com.gemdale.ghome.business.async.deal.center.demo.GmcSmsInfo)

  13.      */

  14.     @Override

  15.     @DataSource("slave")

  16.     public Integer addSlave(GmcSmsInfo smsInfo) throws FrameworkDAOException {

  17.         return save(smsInfo);

  18.     }



  19. }

五:分布式事务下的数据源动态切换
情况一:事务下唯一数据源切换

点击(此处)折叠或打开

  1. @Transactional(rollbackFor={Exception.class,RuntimeException.class})

  2. @DataSource("master")

  3.     public GmcSmsInfo addMaster(GmcSmsInfo smsInfo) throws BusinessServiceException {

  4.         try {

  5.             smsInfo.setSmsId(gmcSmsInfoDaoImpl.save(smsInfo));

  6.         }

  7.         catch (FrameworkDAOException e) {

  8.             throw new BusinessServiceException(e);

  9.         }

  10.         return smsInfo;

  11.     }

@Transactional 通过数据库的connection来建立事务的,为了保证数据源能顺利切换,要保证@DataSource优先于@Transactional执行。   实现办法 在DataSourceAspect 切面上增加注解@Order(1).   此处需要了解切面的层次和执行顺序等相关知识。   @Transactionl,@DataSource 在service层的同一个方法上。

情况二:事务下多数据源切换

service类

点击(此处)折叠或打开

  1. @Autowired

  2.     private GmcSmsInfoDAO gmcSmsInfoDaoImpl;


  3.     @Autowired

  4.     @Qualifier("transactionManager")

  5.     private JtaTransactionManager transactionManager;



  6.     public void addMasterAndSlave(GmcSmsInfo smsInfo) throws BusinessServiceException {

  7.         UserTransaction userTransaction = transactionManager.getUserTransaction();

  8.         try {

  9.             userTransaction.begin();

  10.             gmcSmsInfoDaoImpl.addMaster(smsInfo);

  11.             smsInfo = new GmcSmsInfo();


  12.             smsInfo.setChannel("test2");

  13.             smsInfo.setContent("test2");

  14.             smsInfo.setStatus("001");

  15.             smsInfo.setCreateDate(Calendar.getInstance().getTime());

  16.             smsInfo.setMobile("88888888");

  17.             gmcSmsInfoDaoImpl.addSlave(smsInfo);

  18.             userTransaction.commit();

  19.         }

  20.         catch (Exception e) {

  21.             try {

  22.                 userTransaction.rollback();

  23.             }

  24.             catch (IllegalStateException e1) {

  25.                 e1.printStackTrace();

  26.             }

  27.             catch (SecurityException e1) {

  28.                 e1.printStackTrace();

  29.             }

  30.             catch (SystemException e1) {

  31.                 e1.printStackTrace();

  32.             }

  33.             throw new BusinessServiceException(e);

  34.         }


  35.     }


dao实现类


点击(此处)折叠或打开

  1. /* (non-Javadoc)

  2.      * @see com.gemdale.ghome.business.async.deal.center.demo.GmcSmsInfoDAO#addMaster(com.gemdale.ghome.business.async.deal.center.demo.GmcSmsInfo)

  3.      */

  4.     @Override

  5.     @DataSource("master")

  6.     public Integer addMaster(GmcSmsInfo smsInfo) throws FrameworkDAOException {

  7.         return save(smsInfo);

  8.     }


  9.     /* (non-Javadoc)

  10.      * @see com.gemdale.ghome.business.async.deal.center.demo.GmcSmsInfoDAO#addSlave(com.gemdale.ghome.business.async.deal.center.demo.GmcSmsInfo)

  11.      */

  12.     @Override

  13.     @DataSource("slave")

  14.     public Integer addSlave(GmcSmsInfo smsInfo) throws FrameworkDAOException {

  15.         return save(smsInfo);

  16.     }


注意,此处采用了编程式来实现事务,注解式暂时还没有好的解决方法,欢迎大家讨论分享。  此处的@DataSource 放在了dao的实现层。

“Jdbc怎么实现分布式事务数据源动态切换”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!