xxl-job任务操作源码分析(四)
手动执行任务
页面上点击“执行” 按钮, 前端会发送一个请求 /jobinfo/trigger post 请求
param: id = 任务ID
controller最终会调用service的方法进行处理
public ReturnT<String> triggerJob( int id) {
// 从数据库中查询该任务的具体信息
XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
if (xxlJobInfo == null ) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "任务不存在" );
}
String group = String.valueOf(xxlJobInfo.getJobGroup());
String name = String.valueOf(xxlJobInfo.getId());
try {
// 调用执行器类,触发该任务 ,
XxlJobDynamicScheduler.triggerJob(name, group);
return ReturnT.SUCCESS;
} catch (SchedulerException e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
} // XxlJobDynamicScheduler 中的triggerJob方法 public static boolean triggerJob(String jobName, String jobGroup) throws SchedulerException {
// TriggerKey : name + group
JobKey jobKey = new JobKey(jobName, jobGroup);
boolean result = false ;
if (checkExists(jobName, jobGroup)) {
// 调用quartz的Scheduler来触发任务
scheduler.triggerJob(jobKey);
result = true ;
logger.info( ">>>>>>>>>>> runJob success, jobKey:{}" , jobKey);
} else {
logger.info( ">>>>>>>>>>> runJob fail, jobKey:{}" , jobKey);
}
return result;
} |
暂停任务
页面上点击“暂停” 按钮, 前端会发送一个请求 /jobinfo/pause post 请求
param: id = 任务ID
controller最终会调用service的方法进行处理
XxlJobServiceImpl
@Override public ReturnT<String> pause( int id) {
// 从数据库中获取任务信息,主要是为了获取group和name, 这个是组成在quartz里面的定时器的key
XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
String group = String.valueOf(xxlJobInfo.getJobGroup());
String name = String.valueOf(xxlJobInfo.getId());
try {
// 调用quartz操作类来暂停任务
boolean ret = XxlJobDynamicScheduler.pauseJob(name, group); //
return ret?ReturnT.SUCCESS:ReturnT.FAIL;
} catch (SchedulerException e) {
logger.error(e.getMessage(), e);
return ReturnT.FAIL;
}
} // XxlJobDynamicScheduler 中的pauseJob方法 public static boolean pauseJob(String jobName, String jobGroup) throws SchedulerException {
// TriggerKey : name + group
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
boolean result = false ;
if (checkExists(jobName, jobGroup)) {
// 暂停任务
scheduler.pauseTrigger(triggerKey);
result = true ;
logger.info( ">>>>>>>>>>> pauseJob success, triggerKey:{}" , triggerKey);
} else {
logger.info( ">>>>>>>>>>> pauseJob fail, triggerKey:{}" , triggerKey);
}
return result;
} |
恢复任务
页面上点击“恢复” 按钮, 前端会发送一个请求 /jobinfo/resume post 请求
param: id = 任务ID
controller最终会调用service的方法进行处理
XxlJobServiceImpl
@Override public ReturnT<String> resume( int id) {
// 从数据库中获取任务信息,主要是为了获取group和name, 这个是组成在quartz里面的定时器的key
XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
String group = String.valueOf(xxlJobInfo.getJobGroup());
String name = String.valueOf(xxlJobInfo.getId());
try {
// 恢复任务
boolean ret = XxlJobDynamicScheduler.resumeJob(name, group);
return ret?ReturnT.SUCCESS:ReturnT.FAIL;
} catch (SchedulerException e) {
logger.error(e.getMessage(), e);
return ReturnT.FAIL;
}
} // XxlJobDynamicScheduler 中的resumeJob方法 public static boolean resumeJob(String jobName, String jobGroup) throws SchedulerException {
// TriggerKey : name + group
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
boolean result = false ;
if (checkExists(jobName, jobGroup)) {
// 暂停任务
scheduler.resumeTrigger(triggerKey);
result = true ;
logger.info( ">>>>>>>>>>> resumeJob success, triggerKey:{}" , triggerKey);
} else {
logger.info( ">>>>>>>>>>> resumeJob fail, triggerKey:{}" , triggerKey);
}
return result;
} |
删除任务
页面上点击“执行” 按钮, 前端会发送一个请求 /jobinfo/remove post 请求
param: id = 任务ID
controller最终会调用service的方法进行处理
XxlJobServiceImpl
public ReturnT<String> remove( int id) {
XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
String group = String.valueOf(xxlJobInfo.getJobGroup());
String name = String.valueOf(xxlJobInfo.getId());
try {
//调用quartz删除他内置的定时器
XxlJobDynamicScheduler.removeJob(name, group);
// 删除数据库中的任务
xxlJobInfoDao.delete(id);
// 删除调度日志
xxlJobLogDao.delete(id);
// 如果是脚本类型的任务,则删除脚本变化日志
xxlJobLogGlueDao.deleteByJobId(id);
return ReturnT.SUCCESS;
} catch (SchedulerException e) {
logger.error(e.getMessage(), e);
}
return ReturnT.FAIL;
} |
终止任务
在调度日志的日志列表页面,正在执行者中的任务,可以手动进行终止。 前端会发送 /joblog/logKill? id= 日志ID
JobLogController
@RequestMapping ( "/logKill" )
@ResponseBody public ReturnT<String> logKill( int id){
// 从数据库中获取该日志信息
XxlJobLog log = xxlJobLogDao.load(id);
// 获取该任务的信息
XxlJobInfo jobInfo = xxlJobInfoDao.loadById(log.getJobId());
if (jobInfo== null ) {
return new ReturnT<String>( 500 , "任务不存在" );
}
if (ReturnT.SUCCESS_CODE != log.getTriggerCode()) {
return new ReturnT<String>( 500 , "任务没有触发成功,无需终止" );
}
// request of kill
ReturnT<String> runResult = null ;
try {
// 通过NetComClientProxy创建代理对象,代理对象invoke方法里面包含了HTTP请求,会将该请求发送至执行器那一端。
// 通过执行器来终止该任务 , 下面主要来看一下执行器那边的kill方法。
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(log.getExecutorAddress());
runResult = executorBiz.kill(jobInfo.getId());
} catch (Exception e) {
logger.error(e.getMessage(), e);
runResult = new ReturnT<String>( 500 , e.getMessage());
}
if (ReturnT.SUCCESS_CODE == runResult.getCode()) {
log.setHandleCode(ReturnT.FAIL_CODE);
log.setHandleMsg( I18nUtil.getString( "joblog_kill_log_byman" )+ ":" + (runResult.getMsg()!= null ?runResult.getMsg(): "" ));
log.setHandleTime( new Date());
xxlJobLogDao.updateHandleInfo(log);
return new ReturnT<String>(runResult.getMsg());
} else {
return new ReturnT<String>( 500 , runResult.getMsg());
}
} |
ExecutorBizImpl
@Override public ReturnT<String> kill( int jobId) {
// 从线程池里面根据该任务ID,获取对应的线程
JobThread jobThread = XxlJobExecutor.loadJobThread(jobId);
if (jobThread != null ) {
// 线程存在,则手动移除 ,下面可以看一下remove方法
XxlJobExecutor.removeJobThread(jobId, "人工手动终止" );
return ReturnT.SUCCESS;
}
return new ReturnT<String>(ReturnT.SUCCESS_CODE, "job thread aleady killed." );
} public static void removeJobThread( int jobId, String removeOldReason){
// 从线程池(ConcurrentHashMap)中移除该队列
JobThread oldJobThread = JobThreadRepository.remove(jobId);
if (oldJobThread != null ) {
//发送stop信息,线程的run方法,发现该信息,则会停止运行,并记录日志
oldJobThread.toStop(removeOldReason);
// 发送中断信息
oldJobThread.interrupt();
}
}
|