您的位置:www.85058.com > 技术支持 > quartz定时任务框架调度机制解析

quartz定时任务框架调度机制解析

发布时间:2019-10-18 08:38编辑:技术支持浏览(122)

    原标题:如何高效排查日均调度量超两百万次的重复调度问题? | 技术头条

     

    图片 1

    转自集群调度机制调研及源码分析

    作者 | 余慧娟

    quartz2.2.1集群调度机制调研及源码分析
    引言
    quartz集群架构
    调度器实例化
    调度过程
    触发器的获取
    触发trigger:
    Job执行过程:
    总结:
    附:

    责编 | 郭芮

     

    系统自从改用Quartz做任务调度后,一日的调度量均在两百万次以上。随着调度量的增加,突然开始出现job重复调度的情况,且没有规律可循。网上也没有说得较为清楚的解决办法,于是我们开始调试Quartz源码,并最终找到了问题所在。

    引言

    quratz是目前最为成熟,使用最广泛的java任务调度框架,功能强大配置灵活.在企业应用中占重要地位.quratz在集群环境中的使用方式是每个企业级系统都要考虑的问题.早在2006年,在ITeye上就有一篇关于quratz集群方案的讨论:http://www.iteye.com/topic/40970 ITeye创始人@Robbin在8楼给出了自己对quartz集群应用方案的意见.

    后来有人总结了三种quratz集群方案:http://www.iteye.com/topic/114965

    1.单独启动一个Job Server来跑job,不部署在web容器中.其他web节点当需要启动异步任务的时候,可以通过种种方式(DB, JMS, Web Service, etc)通知Job Server,而Job Server收到这个通知之后,把异步任务加载到自己的任务队列中去。

    2.独立出一个job server,这个server上跑一个spring+quartz的应用,这个应用专门用来启动任务。在jobserver上加上hessain,得到业务接口,这样jobserver就可以调用web container中的业务操作,也就是正真执行任务的还是在cluster中的tomcat。在jobserver启动定时任务之后,轮流调用各地址上的业务操作(类似apache分发tomcat一样),这样可以让不同的定时任务在不同的节点上运行,减低了一台某个node的压力

    3.quartz本身事实上也是支持集群的。在这种方案下,cluster上的每一个node都在跑quartz,然后也是通过数据中记录的状态来判断这个操作是否正在执行,这就要求cluster上所有的node的时间应该是一样的。而且每一个node都跑应用就意味着每一个node都需要有自己的线程池来跑quartz.

    总的来说,第一种方法,在单独的server上执行任务,对任务的适用范围有很大的限制,要访问在web环境中的各种资源非常麻烦.但是集中式的管理容易从架构上规避了分布式环境的种种同步问题.第二种方法在在第一种方法的基础上减轻了jobserver的重量,只发送调用请求,不直接执行任务,这样解决了独立server无法访问web环境的问题,而且可以做到节点的轮询.可以有效地均衡负载.第三种方案是quartz自身支持的集群方案,在架构上完全是分布式的,没有集中的管理,quratz通过数据库锁以及标识字段保证多个节点对任务不重复获取,并且有负载平衡机制和容错机制,用少量的冗余,换取了高可用性(high avilable HA)和高可靠性.(个人认为和git的机制有异曲同工之处,分布式的冗余设计,换取可靠性和速度).

    本文旨在研究quratz为解决分布式任务调度中存在的防止重复执行和负载均衡等问题而建立的机制.以调度流程作为顺序,配合源码理解其中原理.

    quratz的配置,及具体应用请参考CRM项目组的另一篇文章:CRM使用Quartz集群总结分享

    如果没有耐性看完源码解析,可以直接拉到文章最末,有直接简单的解决办法。本文中使用的Quartz版本为2.3.0,且使用JDBC模式存储Job。

    quartz集群架构

    图片 2

    quartz的分布式架构如上图,可以看到数据库是各节点上调度器的枢纽.各个节点并不感知其他节点的存在,只是通过数据库来进行间接的沟通.

    实际上,quartz的分布式策略就是一种以数据库作为边界资源的并发策略.每个节点都遵守相同的操作规范,使得对数据库的操作可以串行执行.而不同名称的调度器又可以互不影响的并行运行.

    组件间的通讯图如下:(*注:主要的sql语句附在文章最后)

    图片 3

    quartz运行时由QuartzSchedulerThread类作为主体,循环执行调度流程。JobStore作为中间层,按照quartz的并发策略执行数据库操作,完成主要的调度逻辑。JobRunShellFactory负责实例化JobDetail对象,将其放入线程池运行。LockHandler负责获取LOCKS表中的数据库锁。

    整个quartz对任务调度的时序大致如下:

    图片 4

    梳理一下其中的流程,可以表示为:

    0.调度器线程run()

    1.获取待触发trigger

        1.1数据库LOCKS表TRIGGER_ACCESS行加锁

        1.2读取JobDetail信息

        1.3读取trigger表中触发器信息并标记为"已获取"

        1.4commit事务,释放锁

    2.触发trigger

        2.1数据库LOCKS表STATE_ACCESS行加锁

        2.2确认trigger的状态

        2.3读取trigger的JobDetail信息

        2.4读取trigger的Calendar信息

        2.3更新trigger信息

        2.3commit事务,释放锁

    3实例化并执行Job

        3.1从线程池获取线程执行JobRunShell的run方法

    可以看到,这个过程中有两个相似的过程:同样是对数据表的更新操作,同样是在执行操作前获取锁 操作完成后释放锁.这一规则可以看做是quartz解决集群问题的核心思想.

    规则流程图:

    图片 5

    进一步解释这条规则就是:一个调度器实例在执行涉及到分布式问题的数据库操作前,首先要获取QUARTZ2_LOCKS表中对应当前调度器的行级锁,获取锁后即可执行其他表中的数据库操作,随着操作事务的提交,行级锁被释放,供其他调度器实例获取.

    集群中的每一个调度器实例都遵循这样一种严格的操作规程,那么对于同一类调度器来说,每个实例对数据库的操作只能是串行的.而不同名的调度器之间却可以并行执行.

    下面我们深入源码,从微观上观察quartz集群调度的细节

    准备

    调度器实例化

    一个最简单的quartz helloworld应用如下:

    图片 6

    public class HelloWorldMain {
        Log log = LogFactory.getLog(HelloWorldMain.class);
    
        public void run() {
            try {
                //取得Schedule对象
                SchedulerFactory sf = new StdSchedulerFactory();
                Scheduler sch = sf.getScheduler(); 
    
                JobDetail jd = new JobDetail("HelloWorldJobDetail",Scheduler.DEFAULT_GROUP,HelloWorldJob.class);
                Trigger tg = TriggerUtils.makeMinutelyTrigger(1);
                tg.setName("HelloWorldTrigger");
    
                sch.scheduleJob(jd, tg);
                sch.start();
            } catch ( Exception e ) {
                e.printStackTrace();
    
            }
        }
        public static void main(String[] args) {
            HelloWorldMain hw = new HelloWorldMain();
            hw.run();
        }
    }
    

    图片 7

    我们看到初始化一个调度器需要用工厂类获取实例:

    SchedulerFactory sf = new StdSchedulerFactory();
    Scheduler sch = sf.getScheduler(); 

    然后启动:

    sch.start();
    下面跟进StdSchedulerFactory的getScheduler()方法:
    

    图片 8

    public Scheduler getScheduler() throws SchedulerException {
            if (cfg == null) {
                initialize();
            }
            SchedulerRepository schedRep = SchedulerRepository.getInstance();
            //从"调度器仓库"中根据properties的SchedulerName配置获取一个调度器实例
            Scheduler sched = schedRep.lookup(getSchedulerName());
            if (sched != null) {
                if (sched.isShutdown()) {
                    schedRep.remove(getSchedulerName());
                } else {
                    return sched;
                }
            }
            //初始化调度器
            sched = instantiate();
            return sched;
        }
    

    图片 9

    跟进初始化调度器方法sched = instantiate();发现是一个700多行的初始化方法,涉及到

    • 读取配置资源,
    • 生成QuartzScheduler对象,
    • 创建该对象的运行线程,并启动线程;
    • 初始化JobStore,QuartzScheduler,DBConnectionManager等重要组件,
      至此,调度器的初始化工作已完成,初始化工作中quratz读取了数据库中存放的对应当前调度器的锁信息,对应CRM中的表QRTZ2_LOCKS,中的STATE_ACCESS,TRIGGER_ACCESS两个LOCK_NAME.

    图片 10

    public void initialize(ClassLoadHelper loadHelper,
                SchedulerSignaler signaler) throws SchedulerConfigException {
            if (dsName == null) {
                throw new SchedulerConfigException("DataSource name not set.");
            }
            classLoadHelper = loadHelper;
            if(isThreadsInheritInitializersClassLoadContext()) {
                log.info("JDBCJobStore threads will inherit ContextClassLoader of thread: " + Thread.currentThread().getName());
                initializersLoader = Thread.currentThread().getContextClassLoader();
            }
    
            this.schedSignaler = signaler;
            // If the user hasn't specified an explicit lock handler, then
            // choose one based on CMT/Clustered/UseDBLocks.
            if (getLockHandler() == null) {
    
                // If the user hasn't specified an explicit lock handler,
                // then we *must* use DB locks with clustering
                if (isClustered()) {
                    setUseDBLocks(true);
                }
    
                if (getUseDBLocks()) {
                    if(getDriverDelegateClass() != null && getDriverDelegateClass().equals(MSSQLDelegate.class.getName())) {
                        if(getSelectWithLockSQL() == null) {
                            //读取数据库LOCKS表中对应当前调度器的锁信息
                            String msSqlDflt = "SELECT * FROM {0}LOCKS WITH (UPDLOCK,ROWLOCK) WHERE " + COL_SCHEDULER_NAME + " = {1} AND LOCK_NAME = ?";
                            getLog().info("Detected usage of MSSQLDelegate class - defaulting 'selectWithLockSQL' to '" + msSqlDflt + "'.");
                            setSelectWithLockSQL(msSqlDflt);
                        }
                    }
                    getLog().info("Using db table-based data access locking (synchronization).");
                    setLockHandler(new StdRowLockSemaphore(getTablePrefix(), getInstanceName(), getSelectWithLockSQL()));
                } else {
                    getLog().info(
                        "Using thread monitor-based data access locking (synchronization).");
                    setLockHandler(new SimpleSemaphore());
                }
            }
        }
    

    图片 11

    当调用sch.start();方法时,scheduler做了如下工作:

    1.通知listener开始启动

    2.启动调度器线程

    3.启动plugin

    4.通知listener启动完成

    图片 12

    public void start() throws SchedulerException {
            if (shuttingDown|| closed) {
                throw new SchedulerException(
                        "The Scheduler cannot be restarted after shutdown() has been called.");
            }
            // QTZ-212 : calling new schedulerStarting() method on the listeners
            // right after entering start()
            //通知该调度器的listener启动开始
            notifySchedulerListenersStarting();
            if (initialStart == null) {
                initialStart = new Date();
                //启动调度器的线程
                this.resources.getJobStore().schedulerStarted();            
                //启动plugins
                startPlugins();
            } else {
                resources.getJobStore().schedulerResumed();
            }
            schedThread.togglePause(false);
            getLog().info(
                    "Scheduler " + resources.getUniqueIdentifier() + " started.");
            //通知该调度器的listener启动完成
            notifySchedulerListenersStarted();
        }
    

    图片 13

    首先,因为本文是代码级别的分析文章,因而需要提前了解Quartz的用途和用法,网上有很多不错的文章,可以提前自行了解。

    调度过程

    调度器启动后,调度器的线程就处于运行状态了,开始执行quartz的主要工作–调度任务.

    前面已介绍过,任务的调度过程大致分为三步:

    1.获取待触发trigger

    2.触发trigger

    3.实例化并执行Job

    下面分别分析三个阶段的源码.

    QuartzSchedulerThread是调度器线程类,调度过程的三个步骤就承载在run()方法中,分析见代码注释:

    按 Ctrl+C 复制代码

    按 Ctrl+C 复制代码

    调度器每次获取到的trigger是30s内需要执行的,所以要等待一段时间至trigger执行前2ms.在等待过程中涉及到一个新加进来更紧急的trigger的处理逻辑.分析写在注释中,不再赘述.

    可以看到调度器的只要在运行状态,就会不停地执行调度流程.值得注意的是,在流程的最后线程会等待一个随机的时间.这就是quartz自带的负载平衡机制.

    以下是三个步骤的跟进:

    其次,在用法之外,我们还需要了解一些Quartz框架的基础概念:

    触发器的获取

    调度器调用:

    triggers = qsRsrcs.getJobStore().acquireNextTriggers(
    now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

    在数据库中查找一定时间范围内将会被触发的trigger.参数的意义如下:参数1:nolaterthan = now+3000ms,即未来30s内将会被触发.参数2 最大获取数量,大小取线程池线程剩余量与定义值得较小者.参数3 时间窗口 默认为0,程序会在nolaterthan后加上窗口大小来选择trigger.quratz会在每次触发trigger后计算出trigger下次要执行的时间,并在数据库QRTZ2_TRIGGERS中的NEXT_FIRE_TIME字段中记录.查找时将当前毫秒数与该字段比较,就能找出下一段时间内将会触发的触发器.查找时,调用在JobStoreSupport类中的方法:

    图片 14

    public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
            throws JobPersistenceException {
    
            String lockName;
            if(isAcquireTriggersWithinLock() || maxCount > 1) {
                lockName = LOCK_TRIGGER_ACCESS;
            } else {
                lockName = null;
            }
            return executeInNonManagedTXLock(lockName,
                    new TransactionCallback<List<OperableTrigger>>() {
                        public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
                            return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
                        }
                    },
                    new TransactionValidator<List<OperableTrigger>>() {
                        public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {
                            //...异常处理回调方法
                        }
                    });
        }
    

    图片 15

    该方法关键的一点在于执行了executeInNonManagedTXLock()方法,这一方法指定了一个锁名,两个回调函数.在开始执行时获得锁,在方法执行完毕后随着事务的提交锁被释放.在该方法的底层,使用 for update语句,在数据库中加入行级锁,保证了在该方法执行过程中,其他的调度器对trigger进行获取时将会等待该调度器释放该锁.此方法是前面介绍的quartz集群策略的的具体实现,这一模板方法在后面的trigger触发过程还会被使用.

    public static final String SELECT_FOR_LOCK = "SELECT * FROM "
                + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
                " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";

    进一步解释:quratz在获取数据库资源之前,先要以for update方式访问LOCKS表中相应LOCK_NAME数据将改行锁定.如果在此前该行已经被锁定,那么等待,如果没有被锁定,那么读取满足要求的trigger,并把它们的status置为STATE_ACQUIRED,如果有tirgger已被置为STATE_ACQUIRED,那么说明该trigger已被别的调度器实例认领,无需再次认领,调度器会忽略此trigger.调度器实例之间的间接通信就体现在这里.

    JobStoreSupport.acquireNextTrigger()方法中:

    int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);

    最后释放锁,这时如果下一个调度器在排队获取trigger的话,则仍会执行相同的步骤.这种机制保证了trigger不会被重复获取.按照这种算法正常运行状态下调度器每次读取的trigger中会有相当一部分已被标记为被获取.

    获取trigger的过程进行完毕.

    • Quartz把触发job叫做fire。TRIGGERSTATE是当前trigger的状态,PREVFIRE_TIME是上一次触发的时间,NEXTFIRETIME是下一次触发的时间,misfire是指这个job在某一时刻要触发、却因为某些原因没有触发的情况。
    • Quartz在运行时,会起两类线程(不止两类),一类用于调度job的调度线程(单线程),一类是用于执行job具体业务的工作池。
    • Quartz自带的表里面,本文将涉及其中3张表:

    触发trigger:

    QuartzSchedulerThread line336:

    List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);

    调用JobStoreSupport类的triggersFired()方法:

    图片 16

    public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException {
            return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
                    new TransactionCallback<List<TriggerFiredResult>>() {
                        public List<TriggerFiredResult> execute(Connection conn) throws JobPersistenceException {
                            List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>();
                            TriggerFiredResult result;
                            for (OperableTrigger trigger : triggers) {
                                try {
                                  TriggerFiredBundle bundle = triggerFired(conn, trigger);
                                  result = new TriggerFiredResult(bundle);
                                } catch (JobPersistenceException jpe) {
                                    result = new TriggerFiredResult(jpe);
                                } catch(RuntimeException re) {
                                    result = new TriggerFiredResult(re);
                                }
                                results.add(result);
                            }
                            return results;
                        }
                    },
                    new TransactionValidator<List<TriggerFiredResult>>() {
                        @Override
                        public Boolean validate(Connection conn, List<TriggerFiredResult> result) throws JobPersistenceException {
                            //...异常处理回调方法
                        }
                    });
        }
    

    图片 17

    此处再次用到了quratz的行为规范:executeInNonManagedTXLock()方法,在获取锁的情况下对trigger进行触发操作.其中的触发细节如下:

    图片 18

    protected TriggerFiredBundle triggerFired(Connection conn,
                OperableTrigger trigger)
            throws JobPersistenceException {
            JobDetail job;
            Calendar cal = null;
            // Make sure trigger wasn't deleted, paused, or completed...
            try { // if trigger was deleted, state will be STATE_DELETED
                String state = getDelegate().selectTriggerState(conn,
                        trigger.getKey());
                if (!state.equals(STATE_ACQUIRED)) {
                    return null;
                }
            } catch (SQLException e) {
                throw new JobPersistenceException("Couldn't select trigger state: "
                        + e.getMessage(), e);
            }
            try {
                job = retrieveJob(conn, trigger.getJobKey());
                if (job == null) { return null; }
            } catch (JobPersistenceException jpe) {
                try {
                    getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
                    getDelegate().updateTriggerState(conn, trigger.getKey(),
                            STATE_ERROR);
                } catch (SQLException sqle) {
                    getLog().error("Unable to set trigger state to ERROR.", sqle);
                }
                throw jpe;
            }
            if (trigger.getCalendarName() != null) {
                cal = retrieveCalendar(conn, trigger.getCalendarName());
                if (cal == null) { return null; }
            }
            try {
                getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);
            } catch (SQLException e) {
                throw new JobPersistenceException("Couldn't insert fired trigger: "
                        + e.getMessage(), e);
            }
            Date prevFireTime = trigger.getPreviousFireTime();
            // call triggered - to update the trigger's next-fire-time state...
            trigger.triggered(cal);
            String state = STATE_WAITING;
            boolean force = true;
    
            if (job.isConcurrentExectionDisallowed()) {
                state = STATE_BLOCKED;
                force = false;
                try {
                    getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                            STATE_BLOCKED, STATE_WAITING);
                    getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                            STATE_BLOCKED, STATE_ACQUIRED);
                    getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                            STATE_PAUSED_BLOCKED, STATE_PAUSED);
                } catch (SQLException e) {
                    throw new JobPersistenceException(
                            "Couldn't update states of blocked triggers: "
                                    + e.getMessage(), e);
                }
            }
    
            if (trigger.getNextFireTime() == null) {
                state = STATE_COMPLETE;
                force = true;
            }
            storeTrigger(conn, trigger, job, true, state, force, false);
            job.getJobDataMap().clearDirtyFlag();
            return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup()
                    .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger
                    .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());
        }
    

    图片 19

    该方法做了以下工作:

    1.获取trigger当前状态

    2.通过trigger中的JobKey读取trigger包含的Job信息

    3.将trigger更新至触发状态

    4.结合calendar的信息触发trigger,涉及多次状态更新

    5.更新数据库中trigger的信息,包括更改状态至STATE_COMPLETE,及计算下一次触发时间.

    6.返回trigger触发结果的数据传输类TriggerFiredBundle

     

    从该方法返回后,trigger的执行过程已基本完毕.回到执行quratz操作规范的executeInNonManagedTXLock方法,将数据库锁释放.

    trigger触发操作完成

    Job执行过程:

    再回到线程类QuartzSchedulerThread的 line353这时触发器都已出发完毕,job的详细信息都已就位

    QuartzSchedulerThread line:368

     

    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
    shell.initialize(qs);

     

    为每个Job生成一个可运行的RunShell,并放入线程池运行.

    在最后调度线程生成了一个随机的等待时间,进入短暂的等待,这使得其他节点的调度器都有机会获取数据库资源.如此就实现了quratz的负载平衡.

    这样一次完整的调度过程就结束了.调度器线程进入下一次循环.

    • triggers表。triggers表里记录了某个trigger的PREVFIRETIME(上次触发时间),NEXT_FIRETIME(下一次触发时间),TRIGGERSTATE(当前状态)。虽未尽述,但是本文用到的只有这些。
    • locks表。Quartz支持分布式,也就是会存在多个线程同时抢占相同资源的情况,而Quartz正是依赖这张表处理这种状况,具体见下文。
    • fired_triggers表。记录正在触发的triggers信息。

    总结:

    简单地说,quartz的分布式调度策略是以数据库为边界资源的一种异步策略.各个调度器都遵守一个基于数据库锁的操作规则保证了操作的唯一性.同时多个节点的异步运行保证了服务的可靠.但这种策略有自己的局限性.摘录官方文档中对quratz集群特性的说明:

    Only one node will fire the job for each firing. What I mean by that is, if the job has a repeating trigger that tells it to fire every 10 seconds, then at 12:00:00 exactly one node will run the job, and at 12:00:10 exactly one node will run the job, etc. It won't necessarily be the same node each time - it will more or less be random which node runs it. The load balancing mechanism is near-random for busy schedulers (lots of triggers) but favors the same node for non-busy (e.g. few triggers) schedulers. 

    The clustering feature works best for scaling out long-running and/or cpu-intensive jobs (distributing the work-load over multiple nodes). If you need to scale out to support thousands of short-running (e.g 1 second) jobs, consider partitioning the set of jobs by using multiple distinct schedulers (including multiple clustered schedulers for HA). The scheduler makes use of a cluster-wide lock, a pattern that degrades performance as you add more nodes (when going beyond about three nodes - depending upon your database's capabilities, etc.).

    说明指出,集群特性对于高cpu使用率的任务效果很好,但是对于大量的短任务,各个节点都会抢占数据库锁,这样就出现大量的线程等待资源.这种情况随着节点的增加会越来越严重.

    附:

    通讯图中关键步骤的主要sql语句:

    图片 20

    3.
    select TRIGGER_ACCESS from QRTZ2_LOCKS for update
    4.
    SELECT TRIGGER_NAME,
    TRIGGER_GROUP,
    NEXT_FIRE_TIME,
    PRIORITY
    FROM QRTZ2_TRIGGERS
    WHERE SCHEDULER_NAME = 'CRMscheduler'
    AND TRIGGER_STATE = 'ACQUIRED'
    AND NEXT_FIRE_TIME <= '{timekey 30s latter}'
    AND ( MISFIRE_INSTR = -1
    OR ( MISFIRE_INSTR != -1
    AND NEXT_FIRE_TIME >= '{timekey now}' ) )
    ORDER BY NEXT_FIRE_TIME ASC,
    PRIORITY DESC;
    5.
    SELECT *
    FROM QRTZ2_JOB_DETAILS
    WHERE SCHEDULER_NAME = CRMscheduler
    AND JOB_NAME = ?
    AND JOB_GROUP = ?;
    6.
    UPDATE TQRTZ2_TRIGGERS
    SET TRIGGER_STATE = 'ACQUIRED'
    WHERE SCHED_NAME = 'CRMscheduler'
    AND TRIGGER_NAME = '{triggerName}'
    AND TRIGGER_GROUP = '{triggerGroup}'
    AND TRIGGER_STATE = 'waiting';
    7.
    INSERT INTO QRTZ2_FIRED_TRIGGERS
    (SCHEDULER_NAME,
    ENTRY_ID,
    TRIGGER_NAME,
    TRIGGER_GROUP,
    INSTANCE_NAME,
    FIRED_TIME,
    SCHED_TIME,
    STATE,
    JOB_NAME,
    JOB_GROUP,
    IS_NONCONCURRENT,
    REQUESTS_RECOVERY,
    PRIORITY)
    VALUES( 'CRMscheduler', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
    8.
    commit;
    12.
    select STAT_ACCESS from QRTZ2_LOCKS for update
    13.
    SELECT TRIGGER_STATE FROM QRTZ2_TRIGGERS WHERE SCHEDULER_NAME = 'CRMscheduler' AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?;
    14.
    SELECT TRIGGER_STATE
    FROM QRTZ2_TRIGGERS
    WHERE SCHEDULER_NAME = 'CRMscheduler'
    AND TRIGGER_NAME = ?
    AND TRIGGER_GROUP = ?;
    14.
    SELECT *
    FROM QRTZ2_JOB_DETAILS
    WHERE SCHEDULER_NAME = CRMscheduler
    AND JOB_NAME = ?
    AND JOB_GROUP = ?;
    15.
    SELECT *
    FROM QRTZ2_CALENDARS
    WHERE SCHEDULER_NAME = 'CRMscheduler'
    AND CALENDAR_NAME = ?;
    16.
    UPDATE QRTZ2_FIRED_TRIGGERS
    SET INSTANCE_NAME = ?,
    FIRED_TIME = ?,
    SCHED_TIME = ?,
    ENTRY_STATE = ?,
    JOB_NAME = ?,
    JOB_GROUP = ?,
    IS_NONCONCURRENT = ?,
    REQUESTS_RECOVERY = ?
    WHERE SCHEDULER_NAME = 'CRMscheduler'
    AND ENTRY_ID = ?;
    17.
    UPDATE TQRTZ2_TRIGGERS
    SET TRIGGER_STATE = ?
    WHERE SCHED_NAME = 'CRMscheduler'
    AND TRIGGER_NAME = '{triggerName}'
    AND TRIGGER_GROUP = '{triggerGroup}'
    AND TRIGGER_STATE = ?;
    18.
    UPDATE QRTZ2_TRIGGERS
    SET JOB_NAME = ?,
    JOB_GROUP = ?,
    DESCRIPTION = ?,
    NEXT_FIRE_TIME = ?,
    PREV_FIRE_TIME = ?,
    TRIGGER_STATE = ?,
    TRIGGER_TYPE = ?,
    START_TIME = ?,
    END_TIME = ?,
    CALENDAR_NAME = ?,
    MISFIRE_INSTRUCTION = ?,
    PRIORITY = ?,
    JOB_DATAMAP = ?
    WHERE SCHEDULER_NAME = SCHED_NAME_SUBST
    AND TRIGGER_NAME = ?
    AND TRIGGER_GROUP = ?;
    19.
    commit;
    

    图片 21

    原文地址:http://demo.netfoucs.com/gklifg/article/details/27090179

    • TRIGGER_STATE,也就是trigger的状态,主要有以下几类:

    图片 22

    图 1 trigger状态变化图

    trigger的初始状态是WAITING,处于WAITING状态的trigger等待被触发。调度线程会不停地扫triggers表,根据NEXTFIRETIME提前拉取即将触发的trigger,如果这个trigger被该调度线程拉取到,它的状态就会变为ACQUIRED。因为是提前拉取trigger,并未到达trigger真正的触发时刻,所以调度线程会等到真正触发的时刻,再将trigger状态由ACQUIRED改为EXECUTING。如果这个trigger不再执行,就将状态改为COMPLETE,否则为WAITING,开始新的周期。如果这个周期中的任何环节抛出异常,trigger的状态会变成ERROR。如果手动暂停这个trigger,状态会变成PAUSED。

    开始排查

    分布式状态下的数据访问

    前文提到,trigger的状态储存在数据库,Quartz支持分布式,所以如果起了多个Quartz服务,会有多个调度线程来抢夺触发同一个trigger。MySQL在默认情况下执行select 语句,是不上锁的,那么如果同时有1个以上的调度线程抢到同一个trigger,是否会导致这个trigger重复调度呢?我们来看看,Quartz是如何解决这个问题的。

    首先,我们先来看下JobStoreSupport类的executeInNonManagedTXLock()方法:

    图片 23

    图 2 executeInNonManagedTXLock方法的具体实现

    这个方法的官方介绍:

    /**

    * Executethe given callback having acquired the given lock.

    *Depending onthe JobStore,the surrounding transaction maybe

    *assumed tobe already present(managed).

    *

    *@param lockName The name of the lock toacquire, forexample

    * "TRIGGER_ACCESS". Ifnull, thenno lock isacquired ,but the

    *lockCallback isstill executed ina transaction.

    */

    也就是说,传入的callback方法在执行过程中携带了指定的锁,并开启了事务,注释也提到,lockName就是指定的锁的名字,如果lockName是空的,那么callback方法的执行不在锁的保护下,但依然在事务中。

    这意味着,我们使用这个方法,不仅可以保证事务,还可以选择保证callback方法的线程安全。

    接下来,我们来看一下executeInNonManagedTXLock(…)中的obtainLock(conn,lockName)方法,即抢锁的过程。这个方法是在Semaphore接口中定义的,Semaphore接口通过锁住线程或者资源,来保护资源不被其他线程修改,由于我们的调度信息是存在数据库的,所以现在查看DBSemaphore.java中obtainLock方法的具体实现:

    图片 24

    图 3 obtainLock方法具体实现

    我们通过调试查看expandedSQL和expandedInsertSQL这两个变量:

    图片 25

    图 4 expandedSQL和expandedInsertSQL的具体内容

    图4可以看出,obtainLock方法通过locks表的一个行锁(lockName确定)来保证callback方法的事务和线程安全。拿到锁后,obtainLock方法将lockName写入threadlocal。当然在releaseLock的时候,会将lockName从threadlocal中删除。

    总而言之,executeInNonManagedTXLock()方法保证了在分布式的情况下,同一时刻只有一个线程可以执行这个方法。

    Quartz的调度过程

    图片 26

    本文由www.85058.com发布于技术支持,转载请注明出处:quartz定时任务框架调度机制解析

    关键词: