xxl-job源码解读1-调度中心启动源码分析

项目完整解读源码地址:https://gitee.com/duhai123/xxl_job_study

xxl-job源码解读1-调度中心启动源码分析

1. 调度中心启动源码分析

启动后首先是XxlJobDynamicSchedulerConfig:注册SchedulerFactoryBean 和xxl-job-admin相关类

/**
 * 注册SchedulerFactoryBean 和xxl-job-admin相关类
 * @author xuxueli 2018-10-28 00:18:17
 */
@Configuration
public class XxlJobDynamicSchedulerConfig {

	/**
	 * SchedulerFactoryBean(可以看出xxl内部使用的是quartz)
	 * @param dataSource
	 * @return
	 */
    @Bean
    public SchedulerFactoryBean getSchedulerFactoryBean(DataSource dataSource){

        SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean();
        schedulerFactory.setDataSource(dataSource);
        schedulerFactory.setAutoStartup(true);                  // 自动启动
        schedulerFactory.setStartupDelay(20);                   // 延时启动,应用启动成功后在启动
        schedulerFactory.setOverwriteExistingJobs(true);        // 覆盖DB中JOB:true、以数据库中已经存在的为准:false
        schedulerFactory.setApplicationContextSchedulerContextKey("applicationContext");
        schedulerFactory.setConfigLocation(new ClassPathResource("quartz.properties"));
        return schedulerFactory;
    }

    /**
     * 启动注册
     * @param schedulerFactory
     * @return
     */
    @Bean(initMethod = "start", destroyMethod = "destroy")
    public XxlJobDynamicScheduler getXxlJobDynamicScheduler(SchedulerFactoryBean schedulerFactory){
        Scheduler scheduler = schedulerFactory.getScheduler();
        //base quartz scheduler util
        //定时任务工具类,在启动中做很多事:(重点)
        XxlJobDynamicScheduler xxlJobDynamicScheduler = new XxlJobDynamicScheduler();
        xxlJobDynamicScheduler.setScheduler(scheduler);

        return xxlJobDynamicScheduler;
    }
}

XxlJobDynamicScheduler:

/**
	 * 启动
	 * 
	 * @throws Exception
	 */
	public void start() throws Exception {
		// valid
		Assert.notNull(scheduler, "quartz scheduler is null");

		// init i18n
		initI18n();

		// admin registry monitor run
		// 启动自动注册线程, 获取类型为自动注册的执行器信息,完成机器的自动注册与发现
		JobRegistryMonitorHelper.getInstance().start();

		// admin monitor run
		// 启动失败日志监控线程
		JobFailMonitorHelper.getInstance().start();

		// admin-server
		// admin的服务启动:让执行器可以调用调度中心的接口
		initRpcProvider();

		logger.info(">>>>>>>>> init xxl-job admin success.");
	}

在来看看JobRegistryMonitorHelper.getInstance().start();中做了哪些事

/**
	 * 1.删除 90秒之内没有更新信息的***器(在线的执行器表数据)
	 * 2.90秒内有更新的更新到执行器信息表
	 */
	public void start(){
		//创建一个线程
		registryThread = new Thread(new Runnable() {
			@Override
			public void run() {
				// 当toStop 为false时进入该循环。
				while (!toStop) {
					try {
						// 获取类型为自动注册的执行器地址列表
						List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
						
						if (CollectionUtils.isNotEmpty(groupList)) {

							// 删除 90秒之内没有更新信息的***器, 90秒没有心跳信息返回,代表机器已经出现问题,故移除
							XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(RegistryConfig.DEAD_TIMEOUT);

							// fresh online address (admin/executor)
							HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
							
							// 查询在90秒之内有过更新的机器列表
							List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT);
							if (list != null) {
								//循环***器列表,  根据执行器不同,将这些机器列表区分拿出来
								for (XxlJobRegistry item: list) {
									//是执行器
									if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
										
										// 获取注册的执行器 KEY(也就是执行器)
										String appName = item.getRegistryKey();
										List<String> registryList = appAddressMap.get(appName);
										if (registryList == null) {
											registryList = new ArrayList<String>();
										}
										if (!registryList.contains(item.getRegistryValue())) {
											registryList.add(item.getRegistryValue());
										}
										// 收集 机器信息,根据执行器做区分
										appAddressMap.put(appName, registryList);
									}
								}
							}

							//  遍历执行器列表
							for (XxlJobGroup group: groupList) {
								// 通过执行器的APP_NAME  拿出他下面的集群机器地址
								List<String> registryList = appAddressMap.get(group.getAppName());
								
								String addressListStr = null;
								if (CollectionUtils.isNotEmpty(registryList)) {
									Collections.sort(registryList);
									 // 转为为String, 通过逗号分隔
									addressListStr = StringUtils.join(registryList, ",");
								}
								group.setAddressList(addressListStr);
								 // 将 这个执行器的 集群机器地址列表,写入到数据库
								XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
							}
						}
					} catch (Exception e) {
						logger.error("job registry instance error:{}", e);
					}
					try {
						TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
					} catch (InterruptedException e) {
						logger.error("job registry instance error:{}", e);
					}
				}
			}
		});
		registryThread.setDaemon(true);
		
		//启动线程
		registryThread.start();
	}

在来看看JobFailMonitorHelper.getInstance().start();中做了哪些事

/**
	 * 失败日志监控线程
	 */
	public void start() {
		monitorThread = new Thread(new Runnable() {

			@Override
			public void run() {
				// monitor
				while (!toStop) {
					try {
						// 从队列中拿出所有可用的 jobLogIds
						List<Integer> jobLogIdList = new ArrayList<Integer>();
						// int drainToNum =
						// 移除此队列中所有可用的元素,并将它们添加到给定 jobLogIdList 中。
						JobFailMonitorHelper.instance.queue.drainTo(jobLogIdList);

						if (CollectionUtils.isNotEmpty(jobLogIdList)) {
							for (Integer jobLogId : jobLogIdList) {
								if (jobLogId == null || jobLogId == 0) {
									continue;
								}

								// 从数据库跟以前有日志信息
								XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(jobLogId);

								if (log == null) {
									continue;
								}

								// 任务触发成功, 但是JobHandle 还没有返回结果
								if (IJobHandler.SUCCESS.getCode() == log.getTriggerCode() && log.getHandleCode() == 0) {
									// 将 JobLogId 放入队列 , 继续监控
									JobFailMonitorHelper.monitor(jobLogId);
									logger.debug(">>>>>>>>>>> job monitor, job running, JobLogId:{}", jobLogId);
								} else if (IJobHandler.SUCCESS.getCode() == log.getHandleCode()) {// 成功的
									// job success, pass
									logger.info(">>>>>>>>>>> job monitor, job success, JobLogId:{}", jobLogId);
								} else {

									// 失败重试次数大于零
									if (log.getExecutorFailRetryCount() > 0) {
										// 放到job触发器线程池再试试(重试)
										JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY,
												(log.getExecutorFailRetryCount() - 1), log.getExecutorShardingParam(),
												null);
										String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"
												+ I18nUtil.getString("jobconf_trigger_type_retry")
												+ "<<<<<<<<<<< </span><br>";
										log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
										// 修改日志触发信息
										XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
									}

									// 1、fail retry
									XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao()
											.loadById(log.getJobId());
									// 任务执行失败, 执行发送邮件等预警措施
									failAlarm(info, log);

									logger.info(">>>>>>>>>>> job monitor, job fail, JobLogId:{}", jobLogId);
								}
							}
						}

						TimeUnit.SECONDS.sleep(10);
					} catch (Exception e) {
						logger.error("job monitor error:{}", e);
					}
				}

				/**
				 * 停止前要做的事
				 */
				// monitor all clear
				List<Integer> jobLogIdList = new ArrayList<Integer>();
				// int drainToNum =
				getInstance().queue.drainTo(jobLogIdList);

				if (jobLogIdList != null && jobLogIdList.size() > 0) {
					for (Integer jobLogId : jobLogIdList) {
						// 主键查询日志
						XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(jobLogId);
						// 两个只要有一个失败
						if (ReturnT.FAIL_CODE == log.getTriggerCode() || ReturnT.FAIL_CODE == log.getHandleCode()) {
							// 主键查询调度任务信息
							XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao()
									.loadById(log.getJobId());
							// fail alarm 任务执行失败, 执行发送邮件等预警措施
							failAlarm(info, log);
							logger.info(">>>>>>>>>>> job monitor last, job fail, JobLogId:{}", jobLogId);
						}
					}
				}

			}
		});
		monitorThread.setDaemon(true);

		// 日志监控
		monitorThread.start();
	}

在来看看initRpcProvider();中做了哪些事

	/**
	 * ----- TODO ----(与调度中心不同,直接设置jettyServerHandler,不像执行器那样单独启动的jetty server,共用一个web端口server.port)
	 */
	private void initRpcProvider() {
		// init
		XxlRpcProviderFactory xxlRpcProviderFactory = new XxlRpcProviderFactory();
		xxlRpcProviderFactory.initConfig(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), null, 0,
				XxlJobAdminConfig.getAdminConfig().getAccessToken(), null, null);

		// add services:增加调度中心服务到PRC
		xxlRpcProviderFactory.addService(AdminBiz.class.getName(), null,
				XxlJobAdminConfig.getAdminConfig().getAdminBiz());

		// jetty handler:直接设置jettyServerHandler,不像执行器那样单独启动的jetty server
		jettyServerHandler = new JettyServerHandler(xxlRpcProviderFactory);
	}

 

以上 是xxl-job 在启动的时候做的操作, 主要启动两个线程和admin的服务启动

  JobRegistryMonitorHelper:用来监控自动注册上来的机器,达到自动注册的目的

  JobFailMonitorHelper:监控任务的执行状态, 如若失败,则发送邮件预警

xxl-job 是基于quartz 进行的二次开发,在系统启动的时候,quartz框架会自动去数据库读取相关的配置信息,载入相关定时器信息