xxl-job任务操作源码分析(四)

手动执行任务

页面上点击“执行” 按钮, 前端会发送一个请求 /jobinfo/trigger   post 请求

param: id = 任务ID

controller最终会调用service的方法进行处理


public ReturnT<String> triggerJob(int id) {
        // 从数据库中查询该任务的具体信息
        XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
       if (xxlJobInfo == null) {
           return new ReturnT<String>(ReturnT.FAIL_CODE, "任务不存在" );
        }
       String group = String.valueOf(xxlJobInfo.getJobGroup());
       String name = String.valueOf(xxlJobInfo.getId());
 
   try {
        // 调用执行器类,触发该任务 ,
      XxlJobDynamicScheduler.triggerJob(name, group);
      return ReturnT.SUCCESS;
   catch (SchedulerException e) {
      logger.error(e.getMessage(), e);
      return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
   }
}
// XxlJobDynamicScheduler 中的triggerJob方法
public static boolean triggerJob(String jobName, String jobGroup) throws SchedulerException {
   // TriggerKey : name + group
   JobKey jobKey = new JobKey(jobName, jobGroup);
     
    boolean result = false;
    if (checkExists(jobName, jobGroup)) {
        // 调用quartz的Scheduler来触发任务
        scheduler.triggerJob(jobKey);
        result = true;
        logger.info(">>>>>>>>>>> runJob success, jobKey:{}", jobKey);
    else {
       logger.info(">>>>>>>>>>> runJob fail, jobKey:{}", jobKey);
    }
    return result;
}


暂停任务

页面上点击“暂停” 按钮, 前端会发送一个请求 /jobinfo/pause  post 请求

param: id = 任务ID

controller最终会调用service的方法进行处理


XxlJobServiceImpl
@Override
public ReturnT<String> pause(int id) {
        // 从数据库中获取任务信息,主要是为了获取group和name, 这个是组成在quartz里面的定时器的key
       XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
       String group = String.valueOf(xxlJobInfo.getJobGroup());
       String name = String.valueOf(xxlJobInfo.getId());
 
   try {
           // 调用quartz操作类来暂停任务
           boolean ret = XxlJobDynamicScheduler.pauseJob(name, group); //
           return ret?ReturnT.SUCCESS:ReturnT.FAIL;
   catch (SchedulerException e) {
      logger.error(e.getMessage(), e);
      return ReturnT.FAIL;
   }
}
 
 
// XxlJobDynamicScheduler 中的pauseJob方法
public static boolean pauseJob(String jobName, String jobGroup) throws SchedulerException {
   // TriggerKey : name + group
   TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
     
    boolean result = false;
    if (checkExists(jobName, jobGroup)) {
        // 暂停任务
        scheduler.pauseTrigger(triggerKey);
        result = true;
        logger.info(">>>>>>>>>>> pauseJob success, triggerKey:{}", triggerKey);
    else {
       logger.info(">>>>>>>>>>> pauseJob fail, triggerKey:{}", triggerKey);
    }
    return result;
}


恢复任务

页面上点击“恢复” 按钮, 前端会发送一个请求 /jobinfo/resume  post 请求

param: id = 任务ID

controller最终会调用service的方法进行处理

XxlJobServiceImpl
@Override
public ReturnT<String> resume(int id) {
        // 从数据库中获取任务信息,主要是为了获取group和name, 这个是组成在quartz里面的定时器的key
       XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
       String group = String.valueOf(xxlJobInfo.getJobGroup());
       String name = String.valueOf(xxlJobInfo.getId());
 
   try {
    // 恢复任务
      boolean ret = XxlJobDynamicScheduler.resumeJob(name, group);
      return ret?ReturnT.SUCCESS:ReturnT.FAIL;
   catch (SchedulerException e) {
      logger.error(e.getMessage(), e);
      return ReturnT.FAIL;
   }
}
 
 
// XxlJobDynamicScheduler 中的resumeJob方法
public static boolean resumeJob(String jobName, String jobGroup) throws SchedulerException {
   // TriggerKey : name + group
   TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
     
    boolean result = false;
    if (checkExists(jobName, jobGroup)) {
        // 暂停任务
        scheduler.resumeTrigger(triggerKey);
        result = true;
        logger.info(">>>>>>>>>>> resumeJob success, triggerKey:{}", triggerKey);
    else {
       logger.info(">>>>>>>>>>> resumeJob fail, triggerKey:{}", triggerKey);
    }
    return result;
}

删除任务

页面上点击“执行” 按钮, 前端会发送一个请求 /jobinfo/remove  post 请求

param: id = 任务ID

controller最终会调用service的方法进行处理

XxlJobServiceImpl
public ReturnT<String> remove(int id) {
   XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
       String group = String.valueOf(xxlJobInfo.getJobGroup());
       String name = String.valueOf(xxlJobInfo.getId());
 
   try {
     //调用quartz删除他内置的定时器
      XxlJobDynamicScheduler.removeJob(name, group);
      // 删除数据库中的任务
      xxlJobInfoDao.delete(id);
      // 删除调度日志
      xxlJobLogDao.delete(id);
      // 如果是脚本类型的任务,则删除脚本变化日志
      xxlJobLogGlueDao.deleteByJobId(id);
      return ReturnT.SUCCESS;
   catch (SchedulerException e) {
      logger.error(e.getMessage(), e);
   }
   return ReturnT.FAIL;
}


终止任务

在调度日志的日志列表页面,正在执行者中的任务,可以手动进行终止。 前端会发送  /joblog/logKill? id= 日志ID


JobLogController
@RequestMapping("/logKill")
@ResponseBody
public ReturnT<String> logKill(int id){
   // 从数据库中获取该日志信息
   XxlJobLog log = xxlJobLogDao.load(id);
  // 获取该任务的信息
   XxlJobInfo jobInfo = xxlJobInfoDao.loadById(log.getJobId());
   if (jobInfo==null) {
      return new ReturnT<String>(500"任务不存在");
   }
   if (ReturnT.SUCCESS_CODE != log.getTriggerCode()) {
      return new ReturnT<String>(500"任务没有触发成功,无需终止");
   }
 
   // request of kill
   ReturnT<String> runResult = null;
   try {
      // 通过NetComClientProxy创建代理对象,代理对象invoke方法里面包含了HTTP请求,会将该请求发送至执行器那一端。
      // 通过执行器来终止该任务 , 下面主要来看一下执行器那边的kill方法。
      ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(log.getExecutorAddress());
      runResult = executorBiz.kill(jobInfo.getId());
   catch (Exception e) {
      logger.error(e.getMessage(), e);
      runResult = new ReturnT<String>(500, e.getMessage());
   }
 
   if (ReturnT.SUCCESS_CODE == runResult.getCode()) {
      log.setHandleCode(ReturnT.FAIL_CODE);
      log.setHandleMsg( I18nUtil.getString("joblog_kill_log_byman")+":" + (runResult.getMsg()!=null?runResult.getMsg():""));
      log.setHandleTime(new Date());
      xxlJobLogDao.updateHandleInfo(log);
      return new ReturnT<String>(runResult.getMsg());
   else {
      return new ReturnT<String>(500, runResult.getMsg());
   }
}
ExecutorBizImpl

@Override
public ReturnT<String> kill(int jobId) {
    // 从线程池里面根据该任务ID,获取对应的线程
    JobThread jobThread = XxlJobExecutor.loadJobThread(jobId);
    if (jobThread != null) {
        // 线程存在,则手动移除 ,下面可以看一下remove方法
        XxlJobExecutor.removeJobThread(jobId, "人工手动终止");
        return ReturnT.SUCCESS;
    }
 
    return new ReturnT<String>(ReturnT.SUCCESS_CODE, "job thread aleady killed.");
}
public static void removeJobThread(int jobId, String removeOldReason){
    // 从线程池(ConcurrentHashMap)中移除该队列
    JobThread oldJobThread = JobThreadRepository.remove(jobId);
    if (oldJobThread != null) {
        //发送stop信息,线程的run方法,发现该信息,则会停止运行,并记录日志
        oldJobThread.toStop(removeOldReason);
        // 发送中断信息
        oldJobThread.interrupt();
    }
}

xxl-job任务操作源码分析(四)