Quartz源码分析(一)------ 以线程等待的方式实现按时间调度

Quartz是运用最广的任务调度框架,它最核心的组成部分是Scheduler、Trigger、JobDetail,然后给Scheduler配置个线程QuartzSchedulerThread,此线程在Scheduler初始化时启动,等待Scheduler start,然后从JobStore里拿到最近要触发的Trigger,以线程等待的方式等到trigger触发时间点,之后就是执行trigger所关联的JobDetail,最后清扫战场。Scheduler初始化、start和trigger执行的时序图如下所示:

Quartz源码分析(一)------ 以线程等待的方式实现按时间调度

其中,最核心的地方是QuartzSchedulerThread运行机制。下面解析一下它的run方法:

  1. publicvoidrun(){
  2. booleanlastAcquireFailed=false;
  3. while(!halted){
  4. try{
  5. //checkifwe'resupposedtopause...
  6. synchronized(pauseLock){
  7. while(paused&&!halted){
  8. try{
  9. //waituntiltogglePause(false)iscalled...
  10. pauseLock.wait(100L);
  11. }catch(InterruptedExceptionignore){
  12. }
  13. }
  14. if(halted){
  15. break;
  16. }
  17. }
  18. ......
  19. }
  20. }

以上是run的最开头的一段,不难看出这是在等待scheduler的start,实际上Quartz就是通过线程的wait或sleep来实现时间调度。继续看代码:

  1. Triggertrigger=null;
  2. longnow=System.currentTimeMillis();
  3. signaled=false;
  4. try{
  5. trigger=qsRsrcs.getJobStore().acquireNextTrigger(
  6. ctxt,now+idleWaitTime);
  7. lastAcquireFailed=false;
  8. }catch(JobPersistenceExceptionjpe){
  9. if(!lastAcquireFailed){
  10. qs.notifySchedulerListenersError(
  11. "Anerroroccuredwhilescanningforthenexttriggertofire.",
  12. jpe);
  13. }
  14. lastAcquireFailed=true;
  15. }catch(RuntimeExceptione){
  16. if(!lastAcquireFailed){
  17. getLog().error("quartzSchedulerThreadLoop:RuntimeException"
  18. +e.getMessage(),e);
  19. }
  20. lastAcquireFailed=true;
  21. }

这段代码是从jobStore里拿到下一个要执行的trigger,一般情况下jobStore使用的是RAMJobStore,即trigger等相关信息存放在内存里,如果需要把任务持久化就得使用可持久化JobStore。继续看代码:

  1. now=System.currentTimeMillis();
  2. longtriggerTime=trigger.getNextFireTime().getTime();
  3. longtimeUntilTrigger=triggerTime-now;
  4. longspinInterval=10;
  5. intnumPauses=(int)(timeUntilTrigger/spinInterval);
  6. while(numPauses>=0&&!signaled){
  7. try{
  8. Thread.sleep(spinInterval);
  9. }catch(InterruptedExceptionignore){
  10. }
  11. now=System.currentTimeMillis();
  12. timeUntilTrigger=triggerTime-now;
  13. numPauses=(int)(timeUntilTrigger/spinInterval);
  14. }
  15. if(signaled){
  16. try{
  17. qsRsrcs.getJobStore().releaseAcquiredTrigger(
  18. ctxt,trigger);
  19. }catch(JobPersistenceExceptionjpe){
  20. qs.notifySchedulerListenersError(
  21. "Anerroroccuredwhilereleasingtrigger'"
  22. +trigger.getFullName()+"'",
  23. jpe);
  24. //dbconnectionmusthavefailed...keep
  25. //retryinguntilit'sup...
  26. releaseTriggerRetryLoop(trigger);
  27. }catch(RuntimeExceptione){
  28. getLog().error(
  29. "releaseTriggerRetryLoop:RuntimeException"
  30. +e.getMessage(),e);
  31. //dbconnectionmusthavefailed...keep
  32. //retryinguntilit'sup...
  33. releaseTriggerRetryLoop(trigger);
  34. }
  35. signaled=false;
  36. continue;
  37. }

此段代码是计算下一个trigger的执行时间和现在系统时间的差,然后通过循环线程sleep的方式暂停住此线程,一直等到trigger的执行时间点。继续看代码:

  1. importorg.quartz.core.JobRunShell;
  2. JobRunShellshell=null;
  3. try{
  4. shell=qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
  5. shell.initialize(qs,bndle);
  6. }catch(SchedulerExceptionse){
  7. try{
  8. qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
  9. trigger,bndle.getJobDetail(),Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
  10. }catch(SchedulerExceptionse2){
  11. qs.notifySchedulerListenersError(
  12. "Anerroroccuredwhileplacingjob'striggersinerrorstate'"
  13. +trigger.getFullName()+"'",se2);
  14. //dbconnectionmusthavefailed...keepretrying
  15. //untilit'sup...
  16. errorTriggerRetryLoop(bndle);
  17. }
  18. continue;
  19. }
  20. if(qsRsrcs.getThreadPool().runInThread(shell)==false){
  21. try{
  22. getLog().error("ThreadPool.runInThread()returnfalse!");
  23. qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
  24. trigger,bndle.getJobDetail(),Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
  25. }catch(SchedulerExceptionse2){
  26. qs.notifySchedulerListenersError(
  27. "Anerroroccuredwhileplacingjob'striggersinerrorstate'"
  28. +trigger.getFullName()+"'",se2);
  29. //dbconnectionmusthavefailed...keepretrying
  30. //untilit'sup...
  31. releaseTriggerRetryLoop(trigger);
  32. }
  33. }

此段代码就是包装trigger,然后通过以JobRunShell为载体,在threadpool里执行trigger所关联的jobDetail。

之后的代码就是清扫战场,就不在累述。