1.可动态配置的源码Schedule设计
2.一文带你搞懂xxl-job(分布式任务调度平台)
3.SpringBoot动态定时任务的实现
可动态配置的Schedule设计
1.背景
定时任务是实际开发中常见的一类功能,例如每天早上凌晨对前一天的源码注册用户数量、渠道来源进行统计,源码并以邮件报表的源码方式发送给相关人员。相信这样的源码需求,每个开发伙伴都处理过。源码领啦 源码
你可以使用Linux的源码Crontab启动应用程序进行处理,或者直接使用Spring的源码Schedule对任务进行调度,还可以使用分布式调度系统,源码如果xxl-job等。源码相信你已经轻车熟路、源码习以为常。源码直到有一天你接到了一个新需求:
1.新建一组任务,源码周期性的源码执行指定SQL并将结果以邮件的方式发送给特定人群;2.比较方便的对任务进行管理,比如启动、源码停止,修改调度周期等;3.动态添加、移除任务,不需要频繁的修改、发布程序;
停顿几分钟,简单思考一下,有哪几种实现思路呢?
本篇文章将从以下几部分进行讨论:
1.SpringSchedule配置和使用。首先我们将介绍Demo的骨架,并基于Spring-Boot完成Schedule的配置;2.数据库定时轮询方案。使用SpringSchedule定时轮询数据库,并执行相应任务。在执行任务策略中,我们将尝试同步和异步执行两种方案,并对其优缺点进行分析;3.基于TaskScheduler动态配置方案。基于数据库轮询或配置中心两种方案动态的对SpringTaskScheduler进行配置,以实现动态管理任务的目的;4.我们进入分布式环境,利用多个冗余节点解决系统高可用问题,同时使用分布式锁保障只会有一个任务同时执行;
2.SpringScheduleSpringBoot上的Schedule的使用非常简单,无需增加新的文件病毒源码依赖,只需简单配置即可。
1.使用@EnableScheduling启用Schedule;2.在要调度的方法上增加@Scheduled;
首先,我们需要在启动类上添加@EnableScheduling注解,该注解将启用SchedulingConfiguration配置类帮我们完成最基本的配置。
@SpringBootApplication@EnableSchedulingpublicclassConfigurableScheduleDemoApplication{ publicstaticvoidmain(String[]args){ SpringApplication.run(ConfigurableScheduleDemoApplication.class,args);}}启用Schedule配置之后,在需要被调度的方法上增加@Scheduled注解。
@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}runTask任务延迟1s进行初始化,并以5s为间隔进行调度。
Scheduled注解类的详细配置如下:
配置含义样例cronlinuxcrontab表达式@Scheduled(cron="*/5****MON-FRI")工作日,每5s调度一次fixedDelay固定间隔,上次运行结束,与下次启动运行,相隔固定时长@Scheduled(fixedDelay=)运行结束后,5S后启动一次调度fixedDelayString与fixedDelay一致fixedRate固定周期,前后两次运行相隔固定的时长@Scheduled(fixedRate=)前后两个任务,间隔5秒fixedRateString与fixedRate一致initialDelay第一次执行,间隔时间@Scheduled(initialDelay=,fixedRate=)第一次执行,延时1秒,以后以5秒为周期进行调度initialDelayString与initialDelay一致环境搭建完成,让我们开始第一个方案。
3.数据库定时轮询使用数据库来管理任务,通过轮询的方案,进行动态调度。首先,我们看下最简单的方案:串行执行方案。
3.1.串行执行方案整体思路非常简单,流程如下:
主要分如下几步:
1.在应用中启动一个Schedule任务(每1秒调度一次),定时从数据库中获取待执行的任务(状态为可用,下一次执行时间小于当前时间);2.根据数据库的任务配置信息,依次遍历并执行任务;3.任务执行完成后,经过计算获得下一次调度时间,将其写回到数据库;4.等待下一次任务调度。
核心代码如下:
@Scheduled(fixedDelay=,app oa源码initialDelay=)publicvoidloadAndRunTask(){ Datenow=newDate();//加载需要运行的任务://1.状态为ENABLE//2.下一次运行时间小于当前时间List<TaskDefinitionV2>shouldRunTasks=loadShouldRunTasks(now);//依次遍历待运行任务,执行对于的任务for(TaskDefinitionV2task:shouldRunTasks){ //DoubleCheckif(task.shouldRun(now)){ //执行任务runTask(task);//更新任务的下一次运行时间updateNextRunTime(task,now);}}}方案简单但非常有效,那该方案存在哪些问题呢?最主要的问题就是:任务串行执行,会导致后面任务出现延时运行;同时,下一轮检查也会被delay。
例如,依次加载了待执行任务task1、task2、task3。其中task1耗时5秒,task2耗时5秒,task3耗时1秒,由于三个任务串行执行,task2将延时5秒,task3延时秒;下一轮检查距上次启动相差秒。
究其根本,核心问题是调度线程和运行线程是同一个线程,调度的运行和任务的运行相互影响。
让我们看一个改进方案:并行执行方案。
3.2.并行执行方案整体执行流程如下:
相比之前的方案,新方案引入了线程池,每一个任务对应一个线程池,避免任务间的相互影响;任务在线程池中异步处理,避免了调度线程的延时。具体流程如下:
1.步骤一不变,在应用中启动一个Schedule任务(每1秒调度一次),定时从数据库中获取待执行的任务(状态为可用,下一次执行时间小于当前时间);2.依次遍历任务,将任务提交到专有线程池中异步执行,调度线程直接返回;3.任务在线程池中运行,结束后更新下一次的运行时间;4.调度线程重新从数据库中获取待执行任务,在将任务提交至线程池中,如果有任务正在执行,使用线程池拒绝策略,德州源码信息抛弃最老的任务;
核心代码如下:
Spring调度任务,每1秒运行一次:
@Scheduled(fixedDelay=,initialDelay=)publicvoidloadAndRunTask(){ Datenow=newDate();//加载所有待运行的任务//1.状态为ENABLE//2.下一次运行时间小于当前时间List<TaskDefinitionV2>shouldRunTasks=loadShouldRunTasks(now);//遍历待运行任务for(TaskDefinitionV2task:shouldRunTasks){ //1.根据TaskId获取任务对应的线程池//2.将任务提交至线程池中this.executorServiceForTask(task.getId()).submit(newTaskRunner(task.getId()));}}自定义线程池,每个线程池最多只有一个线程,空闲超过秒后,线程自动回收,线程饱和时,直接丢弃最老的任务:
privateExecutorServiceexecutorServiceForTask(LongtaskId){ returnthis.executorServiceRegistry.computeIfAbsent(taskId,id->{ BasicThreadFactorythreadFactory=newBasicThreadFactory.Builder()//指定线程池名称.namingPattern("Async-Task-"+taskId+"-Thread-%d")//设置线程为后台线程.daemon(true).build();//线程池核心配置://1.每个线程池最多只有一个线程//2.线程空闲超过秒进行自动回收//3.直接使用交互器,线程空闲进行任务交互//4.使用指定的线程工厂,设置线性名称//5.线程池饱和,自动丢弃最老的任务returnnewThreadPoolExecutor(0,1,L,TimeUnit.SECONDS,newSynchronousQueue<>(),threadFactory,newThreadPoolExecutor.DiscardOldestPolicy());});}最后,在线程池中运行的Task如下:
privateclassTaskRunnerimplementsRunnable{ privatefinalDatenow=newDate();privatefinalLongtaskId;publicTaskRunner(LongtaskId){ this.taskId=taskId;}@Overridepublicvoidrun(){ //重新加载任务,保持最新的任务状态TaskDefinitionV2task=definitionV2Repository.findById(this.taskId).orElse(null);if(task!=null&&task.shouldRun(now)){ //运行任务runTask(task);//更新任务的下一次运行时间updateNextRunTime(task,now);}}}4.TaskScheduler配置方案该方案的核心为:绕过@Schedule注解,直接对Spring底层核心类TaskScheduler进行配置。
TaskScheduler接口是Spring对调度任务的一个抽象,更是@Schedule背后默默的支持者,首先我们看下这个接口定义。
publicinterfaceTaskScheduler{ ScheduledFutureschedule(Runnabletask,Triggertrigger);ScheduledFutureschedule(Runnabletask,InstantstartTime);ScheduledFutureschedule(Runnabletask,DatestartTime);ScheduledFuturescheduleAtFixedRate(Runnabletask,InstantstartTime,Durationperiod);ScheduledFuturescheduleAtFixedRate(Runnabletask,DatestartTime,longperiod);ScheduledFuturescheduleAtFixedRate(Runnabletask,Durationperiod);ScheduledFuturescheduleAtFixedRate(Runnabletask,longperiod);ScheduledFuturescheduleWithFixedDelay(Runnabletask,InstantstartTime,Durationdelay);ScheduledFuturescheduleWithFixedDelay(Runnabletask,DatestartTime,longdelay);ScheduledFuturescheduleWithFixedDelay(Runnabletask,Durationdelay);ScheduledFuturescheduleWithFixedDelay(Runnabletask,longdelay);}满满的都是schedule接口,其他的比较简单就不过多叙述了,重点说下Trigger这个接口,首先看下这个接口的定义:
publicinterfaceTrigger{ DatenextExecutionTime(TriggerContexttriggerContext);}只有一个方法,获取下次执行的时间。在任务执行完成后,会调用Trigger的nextExecutionTime获取下一次运行时间,从而实现周期性调度。
CronTrigger是Trigger的最常见实现,以linuxcrontab的方式配置调度任务,如:
scheduler.schedule(task,newCronTrigger("-**MON-FRI"));基础部分简单介绍到这,让我们看下数据库动态配置方案。
4.1数据库动态配置方案整体设计如下:
仍旧是轮询数据库方式,详细流程如下:
1.在应用中启动一个Schedule任务(每1秒调度一次),定时从数据库中获取所有任务;2.依次遍历任务,与内存中的TaskEntry(任务与状态)进行比对,动态的python整站源码向TaskScheduler中添加或取消调度任务;3.由TaskScheduler负责实际的任务调度;
核心代码如下:
@Scheduled(fixedDelay=,initialDelay=)publicvoidloadAndConfig(){ //加载所有的任务信息List<TaskDefinitionV3>tasks=repository.findAll();//遍历任务进行任务检查for(TaskDefinitionV3task:tasks){ //获取内存任务状态TaskEntrytaskEntry=this.taskEntry.computeIfAbsent(task.getId(),TaskEntry::new);if(task.isEnable()&&taskEntry.isStop()){ //任务为可用,运行状态为停止,则重新进行schedule注册ScheduledFuture<?>scheduledFuture=this.taskScheduler.scheduleWithFixedDelay(newTaskRunner(task),task.getDelay()*);taskEntry.setScheduledFuture(scheduledFuture);log.info("successtostartscheduletaskfor{ }",task);}elseif(task.isDisable()&&taskEntry.isRunning()){ //任务为禁用,运行状态为运行中,停止正在运行在任务taskEntry.stop();log.info("successtostopscheduletaskfor{ }",task);}}}核心辅助类:
@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}0有没有发现,以上方案都有一个共同的缺陷:基于数据库轮询获取任务,加大了数据库压力。理论上,只有在配置发生变化时才有必要对任务进行更新,接下来让我们看下改进方案:基于配置中心的方案。
4.2配置中心通知方案整体设计如下:
核心流程如下:
1.应用启动时,从配置中心中获取调度的配置信息,并完成对TaskScheduler的配置;2.当配置发送变化时,配置中心会主动将配置推送到应用程序,应用程序在接收到变化通知时,动态的增加或取消调度任务;3.任务的实际调度仍旧由TaskScheduler完成。
由于手底下没有配置中心,暂时没有coding,思路很简单,有条件的同学可以自行完成。
5.分布式环境下应用以上方案,都是在单机环境下运行,如果应用程序挂掉了,任务调度也就停止了,为了避免这种情况的发生,需要提升系统的可用性,实现冗余部署和自动化容灾。
以上方案,如果部署多个节点会发生什么?是的,会出现任务被多次调度的问题,为了保障在同一时刻只有一个任务在运行,需要为任务增加一个排他锁。同时,由于排他锁的存在,当一个节点处问题后,另一个节点在调度时会自动获取锁,从而解系统的单点问题。
为了简单,我们使用Redis的分布式锁。
5.1.环境搭建Redisson是Redis的一个富客户端,提供了很多高级的数据结构。本次,我们将使用RLock对应用进行保护。
首先,在pom中引入RedissonStarter。
@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}1然后,在application.properties文件中增加Redis配置,具体如下:
@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}.2引入分布式锁最后,就可以直接使用分布式锁对任务执行进行保护了,代码如下:
@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}3备注:
Redis是典型的AP应用,而分布式锁严格意义上来说是CP。所以基于Redis的分布式锁只能使用在非严格环境中,比如我们的数据报表需求。如果设计金钱,需要使用CP实现,如Zookeeper或etcd等。
6.小结本文从Spring的Schedule出发,依次对数据库轮询方案、TaskScheduler配置方案进行详细讲解,以实现对调度任务的可配置化。最后,使用Redis分布式锁有效解决了分布式环境下任务重复调度和自动容灾问题。
仍旧是那句话,架构设计没有更好,只有最适合。同学们可以根据自己的需求自取。
References[1]源码:/litao/books/tree/master/configurable-schedule
一文带你搞懂xxl-job(分布式任务调度平台)
本篇文章主要记录项目中遇到的 xxl-job 的实战,希望能通过这篇文章告诉读者们什么是 xxl-job 以及怎么使用 xxl-job 并分享一个实战案例。
xxl-job 是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。设计思想是将调度行为抽象形成 调度中心 平台,平台本身不承担业务逻辑,而是负责发起 调度请求 后,由 执行器 接收调度请求并执行 任务,这里的 任务 抽象为 分散的 JobHandler。通过这种方式即可实现 调度 与 任务 相互解耦,从而提高系统整体的稳定性和拓展性。
任务调度指的是系统在约定的指定时间自动去执行指定的任务的过程。在开发项目时大家可能遇到过类似的场景问题,如系统需要定时在每天0点进行数据备份、活动开始前几小时预热执行一些前置业务、定时对 MQ 消息表的发送装填等。这些场景问题都可以通过任务调度 来解决。
单体系统 中有许多实现 任务调度 的方式,如多线程方式、Timer 类、Spring Tasks 等等。这里比较常用的是 Spring Tasks(通过 @EnableScheduling + @Scheduled 的注解可以自定义定时任务,有兴趣的可以去了解一下)。
在分布式系统下,每个服务都可以搭建为集群,这样的好处是可以将任务切片分给每一个服务从而实现并行执行,提高任务调度的处理效率。那么为什么分布式系统 不能使用 单体系统 的任务调度实现方式呢?在集群服务下,如果还是使用每台机器按照单体系统的任务调度实现方式实现的话,会出现下面这四个问题:怎么做到对任务的控制(如何避免任务重复执行)、如果某台机器宕机了,会不会存在任务丢失、如果要增加服务实例,怎么做到弹性扩容、如何做到对任务调度的执行情况统一监测。通过上面的问题可以了解到分布式系统下需要一个满足高可用、容错管理、负载均衡等功能的任务调度平台来实现任务调度。
xxl-job 分布式任务调度系统是一个开源软件,可以在 github 或 gitee 上查看和下载 xxl-job 的源码。在 docker 下安装 xxl-job、创建映射容器的文件目录、在/mydata/xxl-job 的目录下创建 application.properties 文件、导入 tables_xxl-job.sql 文件到指定的数据库、配置参数如数据库位置、访问口令等。
在 Spring Boot 项目中,导入 xxl-job 的 maven 依赖,配置application.yml 文件指定调度中心地址、访问口令、执行器名称和端口等属性,编写配置类配置自定义任务和执行器,完成 SpringBoot 集成 xxl-job 实现分布式任务调度的全过程。
实战案例:当前项目需要对上传到分布式文件系统 minio 中的视频文件进行统一格式的视频转码操作。利用 xxl-job 的方式以任务调度的方式定时处理视频转码操作,以任务调度的方式,可以使得视频转码操作不会阻塞主线程,避免影响主要业务的吞吐量;以集群服务分片接收任务的方式,可以将任务均分给每个机器使得任务调度可以并行执行,提高总任务处理时间以及降低单台机器 CPU 的开销。
xxl-job 执行流程图:在集群部署时,配置路由策略中选择分片广播的方式,可以使一次任务调度会广播触发集群中所有的执行器执行一次任务,并且可以向系统传递分片参数。利用这一特性可以根据当前执行器的分片序号和分片总数来获取对应的任务记录。通过 Bean 模式(基于方法)获取分片序号和分片总数,编写 sql 获取任务记录,实现对集群服务均分任务的操作。
确保任务不会被重复消费:通过幂等性实现,依靠任务的状态(未处理1;处理中2;处理失败3;处理成功4)通过比较和设置的方式只有在状态为未处理或处理失败时才能设置为处理中,避免多个执行器同时处理该任务。设置调度过期策略和阻塞处理策略保证真正的幂等性。
编写完成所有任务:分片视频转码处理,通过分片广播拿到的参数以取模的方式获取当前执行器所属的任务记录集合,遍历集合并发执行任务,使用乐观锁抢占当前任务,执行任务过程包含分布式文件系统下载、视频转码、上传转码后的视频、更新任务状态(处理成功),使用 JUC 工具类 CountDownLatch 实现所有任务执行完后才退出方法,中间使用 xxl-job 的日志记录错误信息和执行结果。
清理任务表中转码成功的任务的记录并将其插入任务历史表,视频补偿机制处理任务超时情况下的任务,做出补偿,处理失败次数大于3次的任务,做出补偿。测试并查看日志,准备好的任务表记录,启动三台媒资服务器,并开启任务,可以单独查看每个任务的日志,通过日志中的执行日志查看具体日志信息,可以看到直接为了测试改错的路径导致下载视频出错,查看数据库表的变化,核心的视频转码任务执行成功,并且逻辑正确,能够起到分布式任务调度的作用。
SpringBoot动态定时任务的实现
1. Spring 定时任务的简单实现
在Spring Boot中使用定时任务,只需要@EnableScheduling开启定时任务支持,在需要调度的方法上添加@Scheduled注解。这样就能够在项目中开启定时调度功能了,支持通过cron、fixedRate、fixedDelay等灵活的控制执行周期和频率。
1.1 缺点周期一旦指定,想要更改必须要重启应用
1.2 需求热更新定时任务的执行周期,基于cron表达式并支持外部存储,如数据库,nacos等
最小改造兼容现有的定时任务(仅需添加一个注解)
动态增加定时任务
2.Spring 定时任务源码分析2.1 @EnableScheduling 引入了配置类 SchedulingConfiguration
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Import(SchedulingConfiguration.class)@Documentedpublic@interfaceEnableScheduling{ }2.2 SchedulingConfiguration只配置了一个bean,ScheduledAnnotationBeanPostProcessor从名字就知道该类实现BeanPostProcessor接口
@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicclassSchedulingConfiguration{ @Bean(name=TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicScheduledAnnotationBeanPostProcessorscheduledAnnotationProcessor(){ returnnewScheduledAnnotationBeanPostProcessor();}}2.3 ScheduledAnnotationBeanPostProcessor的postProcessAfterInitialization实现,可见具体处理@Scheduled实现定时任务的是processScheduled方法
@OverridepublicObjectpostProcessAfterInitialization(Objectbean,StringbeanName){ if(beaninstanceofAopInfrastructureBean||beaninstanceofTaskScheduler||beaninstanceofScheduledExecutorService){ //IgnoreAOPinfrastructuresuchasscopedproxies.returnbean;}Class<?>targetClass=AopProxyUtils.ultimateTargetClass(bean);if(!this.nonAnnotatedClasses.contains(targetClass)&&AnnotationUtils.isCandidateClass(targetClass,Arrays.asList(Scheduled.class,Schedules.class))){ //获取bean的方法及@Scheduled映射关系Map<Method,Set<Scheduled>>annotatedMethods=MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<Scheduled>>)method->{ Set<Scheduled>scheduledMethods=AnnotatedElementUtils.getMergedRepeatableAnnotations(method,Scheduled.class,Schedules.class);return(!scheduledMethods.isEmpty()?scheduledMethods:null);});if(annotatedMethods.isEmpty()){ this.nonAnnotatedClasses.add(targetClass);if(logger.isTraceEnabled()){ logger.trace("No@Scheduledannotationsfoundonbeanclass:"+targetClass);}}else{ //Non-emptysetofmethodsannotatedMethods.forEach((method,scheduledMethods)->//处理@Scheduled注解scheduledMethods.forEach(scheduled->processScheduled(scheduled,method,bean)));if(logger.isTraceEnabled()){ logger.trace(annotatedMethods.size()+"@Scheduledmethodsprocessedonbean'"+beanName+"':"+annotatedMethods);}}}returnbean;}2.4 以下仅贴出ScheduledAnnotationBeanPostProcessor.processScheduled处理cron表达式的关键实现,
privatefinalScheduledTaskRegistrarregistrar;publicScheduledAnnotationBeanPostProcessor(){ this.registrar=newScheduledTaskRegistrar();}protectedvoidprocessScheduled(Scheduledscheduled,Methodmethod,Objectbean){ try{ //将定时任务方法,转为RunnableRunnablerunnable=createRunnable(bean,method);booleanprocessedSchedule=false;Set<ScheduledTask>tasks=newLinkedHashSet<>(4);//Determineinitialdelay//处理scheduled.initialDelay()的值,略过...//CheckcronexpressionStringcron=scheduled.cron();if(StringUtils.hasText(cron)){ Stringzone=scheduled.zone();if(this.embeddedValueResolver!=null){ //${ }变量值表达式的转换cron=this.embeddedValueResolver.resolveStringValue(cron);zone=this.embeddedValueResolver.resolveStringValue(zone);}if(StringUtils.hasLength(cron)){ Assert.isTrue(initialDelay==-1,"'initialDelay'notsupportedforcrontriggers");processedSchedule=true;if(!Scheduled.CRON_DISABLED.equals(cron)){ TimeZonetimeZone;if(StringUtils.hasText(zone)){ timeZone=StringUtils.parseTimeZoneString(zone);}else{ timeZone=TimeZone.getDefault();}//创建cron触发器CronTrigger对象,并注册CronTasktasks.add(this.registrar.scheduleCronTask(newCronTask(runnable,newCronTrigger(cron,timeZone))));}}}//处理fixedDelay和fixedRate,及ScheduledTask保存用于销毁,略过...}//略过catchException...}以上通过this.registrar.scheduleCronTask实现cron定时任务注册或初始化
3.动态定时任务的实现实现思路: 重写ScheduledAnnotationBeanPostProcessor.processScheduled方法,修改处理cron的部分代码,使用this.registrar.scheduleTriggerTask注册或初始化定时任务
3.1 相关类图classDiagramDisposableBean<|--DynamicCronScheduleTaskManagerEnvironmentAware<|--EnvironmentDynamicCronHandlerAbstractDynamicCronHandler<|--EnvironmentDynamicCronHandlerTrigger<|--DynamicCronTriggerEnvironmentAware:+setEnvironment()DisposableBean:+destroy()voidTrigger:+nextExecutionTime(TriggerContexttriggerContext)DateclassDynamicCronScheduleTaskManager{ +Map<String,ScheduledTask>dynamicScheduledTaskMap-ScheduledTaskRegistrarregistrar+addTriggerTask(StringcronName,TriggerTasktask)ScheduledTask+contains(StringcronName)boolean+updateTriggerTask(StringcronName)void+removeTriggerTask(StringcronName)void}classAbstractDynamicCronHandler{ -DynamicCronScheduleTaskManagerdynamicCronScheduleTaskManager;+getCronExpression(StringcronName)String+updateTriggerTash(StringcronName)void}classEnvironmentDynamicCronHandler{ +Environmentenvironment+environmentChangeEvent(EnvironmentChangeEventevent)void}classDynamicCronTrigger{ -StringcronName-AbstractDynamicCronHandlerdynamicCronHandler-StringcronExpression-CronSequenceGeneratorsequenceGenerator}classScheduledDynamicCron{ +value()String+cronName()String+handler()Class<?extendsAbstractDynamicCronHandler>}3.2 DynamicCronScheduleTaskManagerimportorg.springframework.beans.factory.DisposableBean;importorg.springframework.scheduling.config.ScheduledTask;importorg.springframework.scheduling.config.ScheduledTaskRegistrar;importorg.springframework.scheduling.config.TriggerTask;importjava.util.HashMap;importjava.util.Map;/***@authorHuangJS*@date--:下午*/publicclassDynamicCronScheduleTaskManagerimplementsDisposableBean{ privateMap<String,ScheduledTask>dynamicScheduledTaskMap=newHashMap<>();ScheduledTaskRegistrarregistrar;//添加定时任务publicScheduledTaskaddTriggerTask(StringcronName,TriggerTasktask){ ScheduledTaskscheduledTask=dynamicScheduledTaskMap.get(cronName);if(scheduledTask!=null){ scheduledTask.cancel();}scheduledTask=this.registrar.scheduleTriggerTask(task);dynamicScheduledTaskMap.put(cronName,scheduledTask);returnscheduledTask;}publicbooleancontains(StringcronName){ returnthis.dynamicScheduledTaskMap.containsKey(cronName);}//更新定时任务的触发时机publicvoidupdateTriggerTask(StringcronName){ ScheduledTaskscheduledTask=dynamicScheduledTaskMap.get(cronName);if(scheduledTask==null){ thrownewIllegalStateException("InvalidcronName""+cronName+"",nofundScheduledTask");}scheduledTask.cancel();scheduledTask=this.registrar.scheduleTriggerTask((TriggerTask)scheduledTask.getTask());dynamicScheduledTaskMap.put(cronName,scheduledTask);}//移除定时任务publicvoidremoveTriggerTask(StringcronName){ ScheduledTaskscheduledTask=dynamicScheduledTaskMap.remove(cronName);if(scheduledTask!=null){ scheduledTask.cancel();}}@Overridepublicvoiddestroy()throwsException{ for(ScheduledTaskvalue:dynamicScheduledTaskMap.values()){ value.cancel();}this.dynamicScheduledTaskMap.clear();}}3.3 AbstractDynamicCronHandlerpublicabstractclassAbstractDynamicCronHandler{ @AutowiredprivateDynamicCronScheduleTaskManagerdynamicCronScheduleTaskManager;/***获取cron表达式*@return*/publicabstractStringgetCronExpression(StringcronName);/***更新cronName对应的定时任务的触发时机*@paramcronName*/publicvoidupdateTriggerTask(StringcronName){ dynamicCronScheduleTaskManager.updateTriggerTask(cronName);}}3.4 EnvironmentDynamicCronHandler基于Environment,在刷新配置时,自动刷新定时任务的触发时机,支持分布式多节点集群部署。
如,cron表达式配置在nacos,更新nacos上的配置时由于监听了EnvironmentChangeEvent事件实现了定时任务的触发时机的更新
importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.cloud.context.environment.EnvironmentChangeEvent;importorg.springframework.context.EnvironmentAware;importorg.springframework.context.event.EventListener;importorg.springframework.core.env.Environment;/***@authorHuangJS*@date--:上午*/publicclassEnvironmentDynamicCronHandlerextendsAbstractDynamicCronHandlerimplementsEnvironmentAware{ privatefinalLoggerlogger=LoggerFactory.getLogger(EnvironmentDynamicCronHandler.class);privateEnvironmentenvironment;@OverridepublicStringgetCronExpression(StringcronName){ try{ returnenvironment.getProperty(cronName);}catch(Exceptione){ logger.error(e.getMessage(),e);}returnnull;}@OverridepublicvoidsetEnvironment(Environmentenvironment){ this.environment=environment;}@EventListenerpublicvoidenvironmentChangeEvent(EnvironmentChangeEventevent){ for(Stringkey:event.getKeys()){ if(this.dynamicCronScheduleTaskManager.contains(key)){ this.dynamicCronScheduleTaskManager.updateTriggerTask(key);}}}}3.5 DynamicCronTriggerpublicclassDynamicCronTriggerimplementsTrigger{ privatefinalstaticLoggerLOGGER=LoggerFactory.getLogger(DynamicCronTrigger.class);privateStringcronName;privateAbstractDynamicCronHandlerdynamicCronHandler;privateStringcronExpression;privateCronSequenceGeneratorsequenceGenerator;publicDynamicCronTrigger(StringcronName,AbstractDynamicCronHandlerdynamicCronHandler){ this.cronName=cronName;this.dynamicCronHandler=dynamicCronHandler;}@OverridepublicDatenextExecutionTime(TriggerContexttriggerContext){ StringcronExpression=dynamicCronHandler.getCronExpression(cronName);if(cronExpression==null){ returnnull;}if(this.sequenceGenerator==null||!cronExpression.equals(this.cronExpression)){ try{ this.sequenceGenerator=newCronSequenceGenerator(cronExpression);this.cronExpression=cronExpression;}catch(Exceptione){ LOGGER.error(e.getMessage(),e);}}Datedate=triggerContext.lastCompletionTime();if(date!=null){ Datescheduled=triggerContext.lastScheduledExecutionTime();if(scheduled!=null&&date.before(scheduled)){ //Previoustaskapparentlyexecutedtooearly...//Let'ssimplyusethelastcalculatedexecutiontimethen,//inordertopreventaccidentalre-firesinthesamesecond.date=scheduled;}}else{ date=newDate();}returnthis.sequenceGenerator.next(date);}}3.6 注解类ScheduledDynamicCron@Target({ ElementType.METHOD,ElementType.ANNOTATION_TYPE})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic@interfaceScheduledDynamicCron{ /***动态cron名称*@return*/@AliasFor("cronName")Stringvalue()default"";/***动态cr