Quartz源码分析(一)------ 以线程等待的方式实现按时间调度
Quartz是运用最广的任务调度框架,它最核心的组成部分是Scheduler、Trigger、JobDetail,然后给Scheduler配置个线程QuartzSchedulerThread,此线程在Scheduler初始化时启动,等待Scheduler start,然后从JobStore里拿到最近要触发的Trigger,以线程等待的方式等到trigger触发时间点,之后就是执行trigger所关联的JobDetail,最后清扫战场。Scheduler初始化、start和trigger执行的时序图如下所示:
其中,最核心的地方是QuartzSchedulerThread运行机制。下面解析一下它的run方法:
- publicvoidrun(){
- booleanlastAcquireFailed=false;
- while(!halted){
- try{
- //checkifwe'resupposedtopause...
- synchronized(pauseLock){
- while(paused&&!halted){
- try{
- //waituntiltogglePause(false)iscalled...
- pauseLock.wait(100L);
- }catch(InterruptedExceptionignore){
- }
- }
- if(halted){
- break;
- }
- }
- ......
- }
- }
以上是run的最开头的一段,不难看出这是在等待scheduler的start,实际上Quartz就是通过线程的wait或sleep来实现时间调度。继续看代码:
- Triggertrigger=null;
- longnow=System.currentTimeMillis();
- signaled=false;
- try{
- trigger=qsRsrcs.getJobStore().acquireNextTrigger(
- ctxt,now+idleWaitTime);
- lastAcquireFailed=false;
- }catch(JobPersistenceExceptionjpe){
- if(!lastAcquireFailed){
- qs.notifySchedulerListenersError(
- "Anerroroccuredwhilescanningforthenexttriggertofire.",
- jpe);
- }
- lastAcquireFailed=true;
- }catch(RuntimeExceptione){
- if(!lastAcquireFailed){
- getLog().error("quartzSchedulerThreadLoop:RuntimeException"
- +e.getMessage(),e);
- }
- lastAcquireFailed=true;
- }
这段代码是从jobStore里拿到下一个要执行的trigger,一般情况下jobStore使用的是RAMJobStore,即trigger等相关信息存放在内存里,如果需要把任务持久化就得使用可持久化JobStore。继续看代码:
- now=System.currentTimeMillis();
- longtriggerTime=trigger.getNextFireTime().getTime();
- longtimeUntilTrigger=triggerTime-now;
- longspinInterval=10;
- intnumPauses=(int)(timeUntilTrigger/spinInterval);
- while(numPauses>=0&&!signaled){
- try{
- Thread.sleep(spinInterval);
- }catch(InterruptedExceptionignore){
- }
- now=System.currentTimeMillis();
- timeUntilTrigger=triggerTime-now;
- numPauses=(int)(timeUntilTrigger/spinInterval);
- }
- if(signaled){
- try{
- qsRsrcs.getJobStore().releaseAcquiredTrigger(
- ctxt,trigger);
- }catch(JobPersistenceExceptionjpe){
- qs.notifySchedulerListenersError(
- "Anerroroccuredwhilereleasingtrigger'"
- +trigger.getFullName()+"'",
- jpe);
- //dbconnectionmusthavefailed...keep
- //retryinguntilit'sup...
- releaseTriggerRetryLoop(trigger);
- }catch(RuntimeExceptione){
- getLog().error(
- "releaseTriggerRetryLoop:RuntimeException"
- +e.getMessage(),e);
- //dbconnectionmusthavefailed...keep
- //retryinguntilit'sup...
- releaseTriggerRetryLoop(trigger);
- }
- signaled=false;
- continue;
- }
此段代码是计算下一个trigger的执行时间和现在系统时间的差,然后通过循环线程sleep的方式暂停住此线程,一直等到trigger的执行时间点。继续看代码:
- importorg.quartz.core.JobRunShell;
- JobRunShellshell=null;
- try{
- shell=qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
- shell.initialize(qs,bndle);
- }catch(SchedulerExceptionse){
- try{
- qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
- trigger,bndle.getJobDetail(),Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
- }catch(SchedulerExceptionse2){
- qs.notifySchedulerListenersError(
- "Anerroroccuredwhileplacingjob'striggersinerrorstate'"
- +trigger.getFullName()+"'",se2);
- //dbconnectionmusthavefailed...keepretrying
- //untilit'sup...
- errorTriggerRetryLoop(bndle);
- }
- continue;
- }
- if(qsRsrcs.getThreadPool().runInThread(shell)==false){
- try{
- getLog().error("ThreadPool.runInThread()returnfalse!");
- qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
- trigger,bndle.getJobDetail(),Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
- }catch(SchedulerExceptionse2){
- qs.notifySchedulerListenersError(
- "Anerroroccuredwhileplacingjob'striggersinerrorstate'"
- +trigger.getFullName()+"'",se2);
- //dbconnectionmusthavefailed...keepretrying
- //untilit'sup...
- releaseTriggerRetryLoop(trigger);
- }
- }
此段代码就是包装trigger,然后通过以JobRunShell为载体,在threadpool里执行trigger所关联的jobDetail。
之后的代码就是清扫战场,就不在累述。