SpringBoot成长笔记(十二)集成Quartz实现分布式定时任务

一、环境搭建

  • Quartz官网下载
    http://www.quartz-scheduler.org/downloads/

  • 解压、进入D:\07-myTutorial\quartz\quartz-2.2.3-distribution\quartz-2.2.3\docs\dbTables初始化脚本tables_mysql_innodb.sql,mysql使用innodb的方式。

二、springboot配置类

#============================================================================
# Configure JobStore
# Using Spring datasource in SchedulerConfig.java
# Spring uses LocalDataSourceJobStore extension of JobStoreCMT
#============================================================================
org.quartz.jobStore.useProperties=false
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.isClustered = true
org.quartz.jobStore.clusterCheckinInterval = 5000
org.quartz.jobStore.misfireThreshold = 60000
org.quartz.jobStore.txIsolationLevelReadCommitted = true
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate

#============================================================================
# Configure Main Scheduler Properties
# Needed to manage cluster instances
#============================================================================
org.quartz.scheduler.instanceName = ClusterQuartz
org.quartz.scheduler.instanceId= AUTO
org.quartz.scheduler.rmi.export = false
org.quartz.scheduler.rmi.proxy = false
org.quartz.scheduler.wrapJobExecutionInUserTransaction = false

#============================================================================
# Configure ThreadPool
# Can also be configured in spring configuration
#============================================================================
#org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
#org.quartz.threadPool.threadCount = 5
#org.quartz.threadPool.threadPriority = 5
#org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true

三、代码

package com.znv.framework.configuration;

import org.quartz.DisallowConcurrentExecution;
import org.quartz.PersistJobDataAfterExecution;
import org.quartz.spi.JobFactory;
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.Executor;

/**
 * @author MHm
 * @date 2018/12/27.
 */
@Configuration
public class QuartzConfiguration {
    public static final String QUARTZ_PROPERTIES_PATH = "quartz.properties";
    @Autowired
    private DataSource dataSource;
    @Autowired
    private PlatformTransactionManager transactionManager;


    @Bean
    public JobFactory jobFactory(ApplicationContext applicationContext) {
        AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
        jobFactory.setApplicationContext(applicationContext);
        return jobFactory;
    }

    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory) throws IOException {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setAutoStartup(true);
        factory.setJobFactory(jobFactory);
        factory.setQuartzProperties(quartzProperties());
        factory.setDataSource(dataSource);
        factory.setTaskExecutor(schedulerThreadPool());
        factory.setTransactionManager(transactionManager);
        return factory;
    }

    /**
     * 配置文件
     * @return
     * @throws IOException
     */
    @Bean
    public Properties quartzProperties() throws IOException {
        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
        propertiesFactoryBean.setLocation(new ClassPathResource(QUARTZ_PROPERTIES_PATH));
        // 在quartz.properties中的属性被读取并注入后再初始化对象
        propertiesFactoryBean.afterPropertiesSet();
        return propertiesFactoryBean.getObject();
    }
//    @Bean
//    public Scheduler scheduler() throws Exception {
//        Scheduler scheduler = schedulerFactoryBean().getScheduler();
//        scheduler.start();
//        return scheduler;
//    }
//    @Bean
//    public JobDetailFactoryBean job1() {
//        JobDetailFactoryBean jobDetailFactoryBean = new JobDetailFactoryBean();
//
//        jobDetailFactoryBean.setJobClass(QuartzJob.class);
//        jobDetailFactoryBean.setDurability(true);
//        jobDetailFactoryBean.setRequestsRecovery(true);
//
//        return jobDetailFactoryBean;
//    }
//
//    @Bean
//    public CronTriggerFactoryBean trigger1() {
//        CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
//        cronTriggerFactoryBean.setJobDetail(job1().getObject());
//        cronTriggerFactoryBean.setCronExpression("0/3 * * * * ?");
//        return cronTriggerFactoryBean;
//    }

    /**
     * 线程池
     * @return
     */
    @Bean
    public Executor schedulerThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(15);
        executor.setMaxPoolSize(25);
        executor.setQueueCapacity(100);
        return executor;
    }

    /**
     * 自动扫描定时任务
     */
    public static class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {

        private transient AutowireCapableBeanFactory beanFactory;

        @Override
        public void setApplicationContext(final ApplicationContext context) {
            beanFactory = context.getAutowireCapableBeanFactory();
        }

        @Override
        protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
            final Object job = super.createJobInstance(bundle);
            beanFactory.autowireBean(job);
            return job;
        }
    }
}

package com.znv.framework.component;

import com.znv.framework.task.ScheduledJob;
import org.quartz.*;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.stereotype.Component;

/**
 * @author MHm
 * @date 2018/12/27.
 */
@Component
public class ScheduleListener implements BeanPostProcessor {
    @Autowired
    private Scheduler scheduler;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        ScheduledJob scheduledJob = AnnotationUtils.findAnnotation(bean.getClass(), ScheduledJob.class);
        if (scheduledJob != null && bean instanceof Job) {
            JobKey jobKey = new JobKey(scheduledJob.name(), scheduledJob.group());

            JobDetail jobDetail = JobBuilder.newJob(((Job) bean).getClass())
            .withIdentity(jobKey)
            .build();

            Trigger trigger = TriggerBuilder.newTrigger()
            .withIdentity(scheduledJob.name() + "Trigger", scheduledJob.group())
            .forJob(jobDetail)
            .withSchedule(CronScheduleBuilder.cronSchedule(scheduledJob.cronExp()))
            .build();

            try {
                if (!scheduler.checkExists(jobKey)) {
                    scheduler.scheduleJob(jobDetail, trigger);
                }
            } catch (SchedulerException e) {
                e.printStackTrace();
            }
        }
        return bean;
    }
}

package com.znv.framework.task;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * @author MHm
 * @date 2018/12/27.
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ScheduledJob {
    String name();

    String group() default "DEFAULT_GROUP";

    String cronExp();
}

package com.znv.framework.task;

import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @author MHm
 * @date 2018/12/27.
 */
//@PersistJobDataAfterExecution
//@DisallowConcurrentExecution
@Component
@ScheduledJob(name = "JobA", cronExp = "*/10 * * * * ?")
@DisallowConcurrentExecution
public class QuartzJob extends QuartzJobBean {
    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        // TODO Auto-generated method stub
        System.out.println("\nQuartz job " + new Date());
    }
}

四、运行

SpringBoot成长笔记(十二)集成Quartz实现分布式定时任务