定时任务重启后执行策略_quartz定时任务框架调度机制解析

定时任务重启后执⾏策略_quartz定时任务框架调度机制解析quartz2.2.1集调度机制调研及源码分析
引⾔
quartz集架构
调度器实例化
调度过程
触发器的获取
触发trigger:
Job执⾏过程:
总结:
附:
引⾔
1.单独启动⼀个Job Server来跑job,不部署在web容器中.其他web节点当需要启动异步任务的时候,可以通过种种⽅式(DB, JMS, Web Service, etc)通知Job Server,⽽Job Server收到这个通知之后,把异步任务加载到⾃⼰的任务队列中去。
2.独⽴出⼀个job server,这个server上跑⼀个spring+quartz的应⽤,这个应⽤专门⽤来启动任务。在jobserver上加上hessain,得到业务接⼝,这样jobserver就可以调⽤web container中的业务操作,也就是正真执⾏任务的还是在cluster中的tomcat。在jobserver启动定时任务之后,轮流调⽤各地址上的业务操作(类似apache分发tomcat⼀样),这样可以让不同的定时任务在不同的节点上运⾏,减低了⼀台某个node的压⼒
3.quartz本⾝事实上也是⽀持集的。在这种⽅案下,cluster上的每⼀个node都在跑quartz,然后也是通过数据中记录的状态来判断这个操作是否正在执⾏,这就要求cluster上所有的node的时间应该是⼀样的。⽽且每⼀个node都跑应⽤就意味着每⼀个node都需要有⾃⼰的线程池来跑quartz.
总的来说,第⼀种⽅法,在单独的server上执⾏任务,对任务的适⽤范围有很⼤的限制,要访问在web环境中的各种资源⾮常⿇烦.但是集中式的管理容易从架构上规避了分布式环境的种种同步问题.第⼆种⽅法在在第⼀种⽅法的基础上减轻了jobserver的重量,只发送调⽤请求,不直接执⾏任务,这样解决了独⽴server⽆法访问web环境的问题,⽽且可以做到节点的轮询.可以有效地均衡负载.第三种⽅案
是quartz⾃⾝⽀持的集⽅案,在架构上完全是分布式的,没有集中的管理,quratz通过数据库锁以及标识字段保证多个节点对任务不重复获取,并且有负载平衡机制和容错机制,⽤少量的冗余,换取了⾼可⽤性(high avilable HA)和⾼可靠性.(个⼈认为和git的机制有异曲同⼯之处,分布式的冗余设计,换取可靠性和速度).
本⽂旨在研究quratz为解决分布式任务调度中存在的防⽌重复执⾏和负载均衡等问题⽽建⽴的机制.以调度流程作为顺序,配合源码理解其中原理.
quratz的配置,及具体应⽤请参考CRM项⽬组的另⼀篇⽂章:CRM使⽤Quartz集总结分享
quartz集架构
quartz的分布式架构如上图,可以看到数据库是各节点上调度器的枢纽.各个节点并不感知其他节点的存在,只是通过数据库来进⾏间接的沟通.
实际上,quartz的分布式策略就是⼀种以数据库作为边界资源的并发策略.每个节点都遵守相同的操作规范,使得对数据库的操作可以串⾏执⾏.⽽不同名称的调度器⼜可以互不影响的并⾏运⾏.
组件间的通讯图如下:(*注:主要的sql语句附在⽂章最后)
quartz运⾏时由QuartzSchedulerThread类作为主体,循环执⾏调度流程。JobStore作为中间层,按照quartz的并发策略执⾏数据库操作,完成主要的调度逻辑。JobRunShellFactory负责实例化JobDetail对象,将其放⼊线程池运⾏。LockHandler负责获取LOCKS表中的数据库锁。
整个quartz对任务调度的时序⼤致如下:
白板说
梳理⼀下其中的流程,可以表⽰为:
0.调度器线程run()
1.获取待触发trigger
1.1数据库LOCKS表TRIGGER_ACCESS⾏加锁
1.2读取JobDetail信息
1.3读取trigger表中触发器信息并标记为"已获取"
1.4commit事务,释放锁
2.触发trigger
2.1数据库LOCKS表STATE_ACCESS⾏加锁
2.2确认trigger的状态
2.3读取trigger的JobDetail信息
2.4读取trigger的Calendar信息
2.3更新trigger信息
2.3commit事务,释放锁
3实例化并执⾏Job
3.1从线程池获取线程执⾏JobRunShell的run⽅法
可以看到,这个过程中有两个相似的过程:同样是对数据表的更新操作,同样是在执⾏操作前获取锁 操作完成后释放锁.这⼀规则可以看做是quartz解决集问题的核⼼思想.
规则流程图:
进⼀步解释这条规则就是:⼀个调度器实例在执⾏涉及到分布式问题的数据库操作前,⾸先要获取QUARTZ2_LOCKS表中对应当前调度器的⾏级锁,获取锁后即可执⾏其他表中的数据库操作,随着操作事务的提交,⾏级锁被释放,供其他调度器实例获取.
集中的每⼀个调度器实例都遵循这样⼀种严格的操作规程,那么对于同⼀类调度器来说,每个实例对数据库的操作只能是串⾏的.⽽不同名的调度器之间却可以并⾏执⾏.
下⾯我们深⼊源码,从微观上观察quartz集调度的细节统括保单
调度器实例化
⼀个最简单的quartz helloworld应⽤如下:
public class HelloWorldMain {
Log log = Log(HelloWorldMain.class);
public void run() {
try {
/人均国民生产总值
/取得Schedule对象
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler sch = sf.getScheduler();
JobDetail jd = new JobDetail("HelloWorldJobDetail",Scheduler.DEFAULT_GROUP,HelloWorldJob.class);
Trigger tg = TriggerUtils.makeMinutelyTrigger(1);
tg.setName("HelloWorldTrigger");
sch.scheduleJob(jd, tg);
sch.start();
} catch ( Exception e ) {
e.printStackTrace();
}
}
public static void main(String[] args) {
HelloWorldMain hw = new HelloWorldMain();
hw.run();
}
}
我们看到初始化⼀个调度器需要⽤⼯⼚类获取实例:
然后启动:
下⾯跟进StdSchedulerFactory的getScheduler()⽅法:
public Scheduler getScheduler() throws SchedulerException {
if (cfg == null) {
initialize();
}
SchedulerRepository schedRep = Instance();
//从"调度器仓库"中根据properties的SchedulerName配置获取⼀个调度器实例Scheduler sched = schedRep.lookup(getSchedulerName());
if (sched != null) {
if (sched.isShutdown()) {
} else {
return sched;
}
}
//初始化调度器
sched = instantiate();
return sched;
}
跟进初始化调度器⽅法sched = instantiate();发现是⼀个700多⾏的初始化⽅法,涉及到
读取配置资源,
心跳设计师
⽣成QuartzScheduler对象,
创建该对象的运⾏线程,并启动线程;
初始化JobStore,QuartzScheduler,DBConnectionManager等重要组件,
⾄此,调度器的初始化⼯作已完成,初始化⼯作中quratz读取了数据库中存放的对应当前调度器的锁信息,对应CRM中的表QRTZ2_LOCKS,中的STATE_ACCESS,TRIGGER_ACCESS两个LOCK_NAME.
public void initialize(ClassLoadHelper loadHelper,
SchedulerSignaler signaler) throws SchedulerConfigException {
if (dsName == null) {
throw new SchedulerConfigException("DataSource name not set.");
}
classLoadHelper = loadHelper;
if(isThreadsInheritInitializersClassLoadContext()) {
log.info("JDBCJobStore threads will inherit ContextClassLoader of thread: " + Thread.currentThread().getName()); initializersLoader = Thread.currentThread().getContextClassLoader();
}
this.schedSignaler = signaler;
// If the user hasn't specified an explicit lock handler, then
// choose one based on CMT/Clustered/UseDBLocks.
if (getLockHandler() == null) {
北洋海军兴亡史// If the user hasn't specified an explicit lock handler,
// then we *must* use DB locks with clustering
gpu集
if (isClustered()) {
setUseDBLocks(true);
}
if (getUseDBLocks()) {
if(getDriverDelegateClass() != null && getDriverDelegateClass().equals(Name())) {
if(getSelectWithLockSQL() == null) {
//读取数据库LOCKS表中对应当前调度器的锁信息
String msSqlDflt = "SELECT * FROM {0}LOCKS WITH (UPDLOCK,ROWLOCK) WHERE " + COL_SCHEDULER_NAME + " = {1} AND LOCK_NAME = ?";
getLog().info("Detected usage of MSSQLDelegate class - defaulting 'selectWithLockSQL' to '" + msSqlDflt + "'.");
setSelectWithLockSQL(msSqlDflt);
}
}
getLog().info("Using db table-based data access locking (synchronization).");
setLockHandler(new StdRowLockSemaphore(getTablePrefix(), getInstanceName(), getSelectWithLockSQL())); } else {
getLog().info(
"Using thread monitor-based data access locking (synchronization).");
setLockHandler(new SimpleSemaphore());
}
}
}
当调⽤sch.start();⽅法时,scheduler做了如下⼯作:
1.通知listener开始启动
2.启动调度器线程
3.启动plugin
4.通知listener启动完成
public void start() throws SchedulerException {
if (shuttingDown|| closed) {
throw new SchedulerException(
"The Scheduler cannot be restarted after shutdown() has been called.");
}
// QTZ-212 : calling new schedulerStarting() method on the listeners
// right after entering start()
//通知该调度器的listener启动开始
notifySchedulerListenersStarting();
if (initialStart == null) {
initialStart = new Date();
//启动调度器的线程
//启动plugins
startPlugins();
} else {

本文发布于:2024-09-22 01:51:57,感谢您对本站的认可!

本文链接:https://www.17tex.com/xueshu/284534.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:调度   操作   数据库   任务   节点   分布式   集群   启动
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议