1.java线程池(一):java线程池基本使用及Executors
2.还不了解Java的源码5大BlockingQueue阻塞队列源码,看这篇文章就够了
3.可动态配置的源码Schedule设计
4.线程池newCachedThreadPool
5.如何评价synchronousqueue?
java线程池(一):java线程池基本使用及Executors
@[toc] 在前面学习线程组的时候就提到过线程池。实际上线程组在我们的源码日常工作中已经不太会用到,但是源码线程池恰恰相反,是源码我们日常工作中必不可少的工具之一。现在开始对线程池的源码界面开发库源码使用,以及底层ThreadPoolExecutor的源码源码进行分析。1.为什么需要线程池我们在前面对线程基础以及线程的源码生命周期有过详细介绍。一个基本的源码常识就是,线程是源码一个特殊的对象,其底层是源码依赖于JVM的native方法,在jvm虚拟机内部实现的源码。线程与普通对象不一样的源码地方在于,除了需要在堆上分配对象之外,源码还需要给每个线程分配一个线程栈、源码以及本地方法栈、程序计数器等线程的私有空间。线程的初始化工作相对于线程执行的大多数任务而言,都是一个耗时比较长的工作。这与数据库使用一样。有时候我们连接数据库,仅仅只是为了执行一条很小的sql语句。但是在我们日常的开发工作中,我们的绝大部分工作内容,都会分解为一个个短小的执行任务来执行。这样才能更加合理的复用资源。这种思想就与我们之前提到的协程一样。任务要尽可能的小。但是在java中,任务不可能像协程那样拆分得那么细。那么试想,如果说,有一个已经初始化好的很多线程,在随时待命,那么当我们有任务提交的时候,这些线程就可以立即工作,无缝接管我们的任务请求。那么效率就会大大增加。这些个线程可以处理任何任务。这样一来我们就把实际的任务与线程本身进行了解耦。从而将这些线程实现了复用。 这种复用的一次创建,可以重复使用的池化的线程对象就被成为线程池。 在线程池中,我们的线程是可以复用的,不用每次都创建一个新的线程。减少了创建和销毁线程的时间开销。 同时,线程池还具有队列缓冲策略,拒绝机制和动态线程管理。可以实现线程环境的cgic源码分析隔离。当一个线程有问题的时候,也不会对其他的线程造成影响。 以上就是我们使用线程池的原因。一句话来概括就是资源复用,降低开销。
2.java中线程池的实现在java中,线程池的主要接口是Executor和ExecutorService在这两个接口中分别对线程池的行为进行了约束,最主要的是在ExecutorService。之后,线程池的实际实现类是AbstractExecutorService类。这个类有三个主要的实现类,ThreadpoolExecutorService、ForkJoinPool以及DelegatedExecutorService。
后面我们将对这三种最主要的实现类的源码以及实现机制进行分析。
3.创建线程的工厂方法Executors在java中, 已经给我们提供了创建线程池的工厂方法类Executors。通过这个类以静态方法的模式可以为我们创建大多数线程池。Executors提供了5种创建线程池的方式,我们先来看看这个类提供的工厂方法。
3.1 newFixedThreadPool/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue.At any point, at most * { @code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks.The threads in the pool will exist * until it is explicitly { @link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if { @code nThreads <= 0} */public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}这个方法能够创建一个固定线程数量的无界队列的线程池。参数nthreads是最多可同时处理的活动的线程数。如果在所有线程都在处理任务的情况下,提交了其他的任务,那么这些任务将处于等待队列中。直到有一个线程可用为止。如果任何线程在关闭之前的执行过程中,由于失败而终止,则需要在执行后续任务的时候,创建一个新的线程来替换。线程池中的所有线程都将一直存在,直到显示的调用了shutdown方法。 上述方法能创建一个固定线程数量的线程池。内部默认的是使用LinkedBlockingQueue。但是需要注意的是,这个LinkedBlockingQueue底层是链表结构,其允许的最大队列长度为Integer.MAX_VALUE。
public LinkedBlockingQueue() { this(Integer.MAX_VALUE);}这样在使用的过程中如果我们没有很好的控制,那么就可能导致内存溢出,出现OOM异常。因此这种方式实际上已经不被提倡。我们在使用的过程中应该谨慎使用。 newFixedThreadPool(int nThreads, ThreadFactory threadFactory)方法:
/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue, using the provided * ThreadFactory to create new threads when needed.At any point, * at most { @code nThreads} threads will be active processing * tasks.If additional tasks are submitted when all threads are * active, they will wait in the queue until a thread is * available.If any thread terminates due to a failure during * execution prior to shutdown, a new one will take its place if * needed to execute subsequent tasks.The threads in the pool will * exist until it is explicitly { @link ExecutorService#shutdown * shutdown}. * * @param nThreads the number of threads in the pool * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null * @throws IllegalArgumentException if { @code nThreads <= 0} */public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);}这个方法与3.1中newFixedThreadPool(int nThreads)的方法的唯一区别就是,增加了threadFactory参数。在前面方法中,对于线程的创建是采用的默认实现Executors.defaultThreadFactory()。而在此方法中,可以根据需要自行定制。
3.2 newSingleThreadExecutor/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.)Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * { @code newFixedThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created single-threaded Executor */public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}此方法将会创建指有一个线程和一个无届队列的线程池。需要注意的是,如果这个执行线程在执行过程中由于失败而终止,powerproject管理源码那么需要在执行后续任务的时候,用一个新的线程来替换。 那么这样一来,上述线程池就能确保任务的顺序性,并且在任何时间都不会有多个线程处于活动状态。与newFixedThreadPool(1)不同的是,使用newSingleThreadExecutor返回的ExecutorService不能被重新分配线程数量。而使用newFixExecutor(1)返回的ExecutorService,其活动的线程的数量可以重新分配。后面专门对这个问题进行详细分析。 newSingleThreadExecutor(ThreadFactory threadFactory) 方法:
/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue, and uses the provided ThreadFactory to * create a new thread when needed. Unlike the otherwise * equivalent { @code newFixedThreadPool(1, threadFactory)} the * returned executor is guaranteed not to be reconfigurable to use * additional threads. * * @param threadFactory the factory to use when creating new * threads * * @return the newly created single-threaded Executor * @throws NullPointerException if threadFactory is null */public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));}这个方法与3.3中newSingleThreadExecutor的区别就在于增加了一个threadFactory。可以自定义创建线程的方法。
3.3 newCachedThreadPool/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available.These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to { @code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using { @link ThreadPoolExecutor} constructors. * * @return the newly created thread pool */public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}这个方法用来创建一个线程池,该线程池可以根据需要自动增加线程。以前的线程也可以复用。这个线程池通常可以提高很多执行周期短的异步任务的性能。对于execute将重用以前的构造线程。如果没有可用的线程,就创建一个 新的线程添加到pool中。秒内,如果该线程没有被使用,则该线程将会终止,并从缓存中删除。因此,在足够长的时间内,这个线程池不会消耗任何资源。可以使用ThreadPoolExecutor构造函数创建具有类似属性但是详细信息不同的线程池。 ?需要注意的是,这个方法创建的线程池,虽然队列的长度可控,但是线程的数量的范围是Integer.MAX_VALUE。这样的话,如果使用不当,同样存在OOM的风险。比如说,我们使用的每个任务的耗时比较长,任务的请求又非常快,那么这样势必会造成在单位时间内创建了大量的线程。从而造成内存溢出。 newCachedThreadPool(ThreadFactory threadFactory)方法:
/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available, and uses the provided * ThreadFactory to create new threads when needed. * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null */public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);}这个方法区别同样也是在于,增加了threadFactory可以自行指定线程的创建方式。
2.4 newScheduledThreadPool/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @return a newly created scheduled thread pool * @throws IllegalArgumentException if { @code corePoolSize < 0} */public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}创建一个线程池,该线程池可以将任务在指定的延迟时间之后运行。或者定期运行。这个方法返回的是ScheduledThreadPoolExecutor。这个类是ThreadPoolExecutor的子类。在原有线程池的的基础之上,增加了延迟和定时功能。我们在后面分析了ThreadPoolExecutor源码之后,再来分析这个类的imagemagick源码包源码。 与之类似的方法:
/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @param threadFactory the factory to use when the executor * creates a new thread * @return a newly created scheduled thread pool * @throws IllegalArgumentException if { @code corePoolSize < 0} * @throws NullPointerException if threadFactory is null */public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);}通过这个方法,我们可以指定threadFactory。自定义线程创建的方式。 同样,我们还可以只指定一个线程:
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));}public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory));}上述两个方法都可以实现这个功能,但是需要注意的是,这两个方法的返回在外层包裹了一个包装类。
3.5 newWorkStealingPool这种方式是在jdk1.8之后新增的。我们先来看看其源码:
public LinkedBlockingQueue() { this(Integer.MAX_VALUE);}0这个方法实际上返回的是ForkJoinPool。该方法创建了一
还不了解Java的5大BlockingQueue阻塞队列源码,看这篇文章就够了
引言
本文将详细解读Java中常见的5种BlockingQueue阻塞队列,包括它们的优缺点、区别以及典型应用场景,以帮助深入理解这5种队列的独特性质和使用场合。
常见的BlockingQueue有以下5种:
1. **基于数组实现的阻塞队列**:创建时需指定容量大小,是有限队列。
2. **基于链表实现的阻塞队列**:默认无界,可自定义容量。
3. **无缓冲阻塞队列**:生产的数据需立即被消费,无缓冲。
4. **优先级阻塞队列**:支持元素按照大小排序,无界。
5. **延迟阻塞队列**:基于PriorityQueue实现,无界。
**BlockingQueue简介
**BlockingQueue作为接口,定义了放数据和取数据的多组方法,适用于并发多线程环境,特别适合生产者-消费者模式。
**应用场景
**BlockingQueue的作用类似于消息队列,用于解耦、异步处理和削峰,适用于线程池的核心功能实现。
**区别与比较
**- **ArrayBlockingQueue**:基于数组实现,容量可自定义。
- **LinkedBlockingQueue**:基于链表实现,无界或自定义容量。
- **SynchronousQueue**:同步队列,生产者和消费者直接交互,无需缓冲。
- **PriorityBlockingQueue**:实现优先级排序,无界队列。
- **DelayQueue**:本地延迟队列,支持元素延迟执行。
在选择使用哪种队列时,需考虑具体任务的特性、吞吐量需求以及是否需要优先级排序或延迟执行。
本文旨在提供全面理解Java中BlockingQueue的指南,从源码剖析到应用场景,帮助开发者更好地应用这些工具于实际项目中。
可动态配置的wigner ville源码Schedule设计
1.背景
定时任务是实际开发中常见的一类功能,例如每天早上凌晨对前一天的注册用户数量、渠道来源进行统计,并以邮件报表的方式发送给相关人员。相信这样的需求,每个开发伙伴都处理过。
你可以使用Linux的Crontab启动应用程序进行处理,或者直接使用Spring的Schedule对任务进行调度,还可以使用分布式调度系统,如果xxl-job等。相信你已经轻车熟路、习以为常。直到有一天你接到了一个新需求:
1.新建一组任务,周期性的执行指定SQL并将结果以邮件的方式发送给特定人群;2.比较方便的对任务进行管理,比如启动、停止,修改调度周期等;3.动态添加、移除任务,不需要频繁的修改、发布程序;
停顿几分钟,简单思考一下,有哪几种实现思路呢?
本篇文章将从以下几部分进行讨论:
1.SpringSchedule配置和使用。首先我们将介绍Demo的骨架,并基于Spring-Boot完成Schedule的配置;2.数据库定时轮询方案。使用SpringSchedule定时轮询数据库,并执行相应任务。在执行任务策略中,我们将尝试同步和异步执行两种方案,并对其优缺点进行分析;3.基于TaskScheduler动态配置方案。基于数据库轮询或配置中心两种方案动态的对SpringTaskScheduler进行配置,以实现动态管理任务的目的;4.我们进入分布式环境,利用多个冗余节点解决系统高可用问题,同时使用分布式锁保障只会有一个任务同时执行;
2.SpringScheduleSpringBoot上的Schedule的使用非常简单,无需增加新的依赖,只需简单配置即可。
1.使用@EnableScheduling启用Schedule;2.在要调度的方法上增加@Scheduled;
首先,我们需要在启动类上添加@EnableScheduling注解,该注解将启用SchedulingConfiguration配置类帮我们完成最基本的配置。
@SpringBootApplication@EnableSchedulingpublicclassConfigurableScheduleDemoApplication{ publicstaticvoidmain(String[]args){ SpringApplication.run(ConfigurableScheduleDemoApplication.class,args);}}启用Schedule配置之后,在需要被调度的方法上增加@Scheduled注解。
@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}runTask任务延迟1s进行初始化,并以5s为间隔进行调度。
Scheduled注解类的详细配置如下:
配置含义样例cronlinuxcrontab表达式@Scheduled(cron="*/5****MON-FRI")工作日,每5s调度一次fixedDelay固定间隔,上次运行结束,与下次启动运行,相隔固定时长@Scheduled(fixedDelay=)运行结束后,5S后启动一次调度fixedDelayString与fixedDelay一致fixedRate固定周期,前后两次运行相隔固定的时长@Scheduled(fixedRate=)前后两个任务,间隔5秒fixedRateString与fixedRate一致initialDelay第一次执行,间隔时间@Scheduled(initialDelay=,fixedRate=)第一次执行,延时1秒,以后以5秒为周期进行调度initialDelayString与initialDelay一致环境搭建完成,让我们开始第一个方案。
3.数据库定时轮询使用数据库来管理任务,通过轮询的方案,进行动态调度。首先,我们看下最简单的方案:串行执行方案。
3.1.串行执行方案整体思路非常简单,流程如下:
主要分如下几步:
1.在应用中启动一个Schedule任务(每1秒调度一次),定时从数据库中获取待执行的任务(状态为可用,下一次执行时间小于当前时间);2.根据数据库的任务配置信息,依次遍历并执行任务;3.任务执行完成后,经过计算获得下一次调度时间,将其写回到数据库;4.等待下一次任务调度。
核心代码如下:
@Scheduled(fixedDelay=,initialDelay=)publicvoidloadAndRunTask(){ Datenow=newDate();//加载需要运行的任务://1.状态为ENABLE//2.下一次运行时间小于当前时间List<TaskDefinitionV2>shouldRunTasks=loadShouldRunTasks(now);//依次遍历待运行任务,执行对于的任务for(TaskDefinitionV2task:shouldRunTasks){ //DoubleCheckif(task.shouldRun(now)){ //执行任务runTask(task);//更新任务的下一次运行时间updateNextRunTime(task,now);}}}方案简单但非常有效,那该方案存在哪些问题呢?最主要的问题就是:任务串行执行,会导致后面任务出现延时运行;同时,下一轮检查也会被delay。
例如,依次加载了待执行任务task1、task2、task3。其中task1耗时5秒,task2耗时5秒,task3耗时1秒,由于三个任务串行执行,task2将延时5秒,task3延时秒;下一轮检查距上次启动相差秒。
究其根本,核心问题是调度线程和运行线程是同一个线程,调度的运行和任务的运行相互影响。
让我们看一个改进方案:并行执行方案。
3.2.并行执行方案整体执行流程如下:
相比之前的方案,新方案引入了线程池,每一个任务对应一个线程池,避免任务间的相互影响;任务在线程池中异步处理,避免了调度线程的延时。具体流程如下:
1.步骤一不变,在应用中启动一个Schedule任务(每1秒调度一次),定时从数据库中获取待执行的任务(状态为可用,下一次执行时间小于当前时间);2.依次遍历任务,将任务提交到专有线程池中异步执行,调度线程直接返回;3.任务在线程池中运行,结束后更新下一次的运行时间;4.调度线程重新从数据库中获取待执行任务,在将任务提交至线程池中,如果有任务正在执行,使用线程池拒绝策略,抛弃最老的任务;
核心代码如下:
Spring调度任务,每1秒运行一次:
@Scheduled(fixedDelay=,initialDelay=)publicvoidloadAndRunTask(){ Datenow=newDate();//加载所有待运行的任务//1.状态为ENABLE//2.下一次运行时间小于当前时间List<TaskDefinitionV2>shouldRunTasks=loadShouldRunTasks(now);//遍历待运行任务for(TaskDefinitionV2task:shouldRunTasks){ //1.根据TaskId获取任务对应的线程池//2.将任务提交至线程池中this.executorServiceForTask(task.getId()).submit(newTaskRunner(task.getId()));}}自定义线程池,每个线程池最多只有一个线程,空闲超过秒后,线程自动回收,线程饱和时,直接丢弃最老的任务:
privateExecutorServiceexecutorServiceForTask(LongtaskId){ returnthis.executorServiceRegistry.computeIfAbsent(taskId,id->{ BasicThreadFactorythreadFactory=newBasicThreadFactory.Builder()//指定线程池名称.namingPattern("Async-Task-"+taskId+"-Thread-%d")//设置线程为后台线程.daemon(true).build();//线程池核心配置://1.每个线程池最多只有一个线程//2.线程空闲超过秒进行自动回收//3.直接使用交互器,线程空闲进行任务交互//4.使用指定的线程工厂,设置线性名称//5.线程池饱和,自动丢弃最老的任务returnnewThreadPoolExecutor(0,1,L,TimeUnit.SECONDS,newSynchronousQueue<>(),threadFactory,newThreadPoolExecutor.DiscardOldestPolicy());});}最后,在线程池中运行的Task如下:
privateclassTaskRunnerimplementsRunnable{ privatefinalDatenow=newDate();privatefinalLongtaskId;publicTaskRunner(LongtaskId){ this.taskId=taskId;}@Overridepublicvoidrun(){ //重新加载任务,保持最新的任务状态TaskDefinitionV2task=definitionV2Repository.findById(this.taskId).orElse(null);if(task!=null&&task.shouldRun(now)){ //运行任务runTask(task);//更新任务的下一次运行时间updateNextRunTime(task,now);}}}4.TaskScheduler配置方案该方案的核心为:绕过@Schedule注解,直接对Spring底层核心类TaskScheduler进行配置。
TaskScheduler接口是Spring对调度任务的一个抽象,更是@Schedule背后默默的支持者,首先我们看下这个接口定义。
publicinterfaceTaskScheduler{ ScheduledFutureschedule(Runnabletask,Triggertrigger);ScheduledFutureschedule(Runnabletask,InstantstartTime);ScheduledFutureschedule(Runnabletask,DatestartTime);ScheduledFuturescheduleAtFixedRate(Runnabletask,InstantstartTime,Durationperiod);ScheduledFuturescheduleAtFixedRate(Runnabletask,DatestartTime,longperiod);ScheduledFuturescheduleAtFixedRate(Runnabletask,Durationperiod);ScheduledFuturescheduleAtFixedRate(Runnabletask,longperiod);ScheduledFuturescheduleWithFixedDelay(Runnabletask,InstantstartTime,Durationdelay);ScheduledFuturescheduleWithFixedDelay(Runnabletask,DatestartTime,longdelay);ScheduledFuturescheduleWithFixedDelay(Runnabletask,Durationdelay);ScheduledFuturescheduleWithFixedDelay(Runnabletask,longdelay);}满满的都是schedule接口,其他的比较简单就不过多叙述了,重点说下Trigger这个接口,首先看下这个接口的定义:
publicinterfaceTrigger{ DatenextExecutionTime(TriggerContexttriggerContext);}只有一个方法,获取下次执行的时间。在任务执行完成后,会调用Trigger的nextExecutionTime获取下一次运行时间,从而实现周期性调度。
CronTrigger是Trigger的最常见实现,以linuxcrontab的方式配置调度任务,如:
scheduler.schedule(task,newCronTrigger("-**MON-FRI"));基础部分简单介绍到这,让我们看下数据库动态配置方案。
4.1数据库动态配置方案整体设计如下:
仍旧是轮询数据库方式,详细流程如下:
1.在应用中启动一个Schedule任务(每1秒调度一次),定时从数据库中获取所有任务;2.依次遍历任务,与内存中的TaskEntry(任务与状态)进行比对,动态的向TaskScheduler中添加或取消调度任务;3.由TaskScheduler负责实际的任务调度;
核心代码如下:
@Scheduled(fixedDelay=,initialDelay=)publicvoidloadAndConfig(){ //加载所有的任务信息List<TaskDefinitionV3>tasks=repository.findAll();//遍历任务进行任务检查for(TaskDefinitionV3task:tasks){ //获取内存任务状态TaskEntrytaskEntry=this.taskEntry.computeIfAbsent(task.getId(),TaskEntry::new);if(task.isEnable()&&taskEntry.isStop()){ //任务为可用,运行状态为停止,则重新进行schedule注册ScheduledFuture<?>scheduledFuture=this.taskScheduler.scheduleWithFixedDelay(newTaskRunner(task),task.getDelay()*);taskEntry.setScheduledFuture(scheduledFuture);log.info("successtostartscheduletaskfor{ }",task);}elseif(task.isDisable()&&taskEntry.isRunning()){ //任务为禁用,运行状态为运行中,停止正在运行在任务taskEntry.stop();log.info("successtostopscheduletaskfor{ }",task);}}}核心辅助类:
@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}0有没有发现,以上方案都有一个共同的缺陷:基于数据库轮询获取任务,加大了数据库压力。理论上,只有在配置发生变化时才有必要对任务进行更新,接下来让我们看下改进方案:基于配置中心的方案。
4.2配置中心通知方案整体设计如下:
核心流程如下:
1.应用启动时,从配置中心中获取调度的配置信息,并完成对TaskScheduler的配置;2.当配置发送变化时,配置中心会主动将配置推送到应用程序,应用程序在接收到变化通知时,动态的增加或取消调度任务;3.任务的实际调度仍旧由TaskScheduler完成。
由于手底下没有配置中心,暂时没有coding,思路很简单,有条件的同学可以自行完成。
5.分布式环境下应用以上方案,都是在单机环境下运行,如果应用程序挂掉了,任务调度也就停止了,为了避免这种情况的发生,需要提升系统的可用性,实现冗余部署和自动化容灾。
以上方案,如果部署多个节点会发生什么?是的,会出现任务被多次调度的问题,为了保障在同一时刻只有一个任务在运行,需要为任务增加一个排他锁。同时,由于排他锁的存在,当一个节点处问题后,另一个节点在调度时会自动获取锁,从而解系统的单点问题。
为了简单,我们使用Redis的分布式锁。
5.1.环境搭建Redisson是Redis的一个富客户端,提供了很多高级的数据结构。本次,我们将使用RLock对应用进行保护。
首先,在pom中引入RedissonStarter。
@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}1然后,在application.properties文件中增加Redis配置,具体如下:
@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}.2引入分布式锁最后,就可以直接使用分布式锁对任务执行进行保护了,代码如下:
@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}3备注:
Redis是典型的AP应用,而分布式锁严格意义上来说是CP。所以基于Redis的分布式锁只能使用在非严格环境中,比如我们的数据报表需求。如果设计金钱,需要使用CP实现,如Zookeeper或etcd等。
6.小结本文从Spring的Schedule出发,依次对数据库轮询方案、TaskScheduler配置方案进行详细讲解,以实现对调度任务的可配置化。最后,使用Redis分布式锁有效解决了分布式环境下任务重复调度和自动容灾问题。
仍旧是那句话,架构设计没有更好,只有最适合。同学们可以根据自己的需求自取。
References[1]源码:/litao/books/tree/master/configurable-schedule
线程池newCachedThreadPool
新线程池newCachedThreadPool的源码揭示了其独特设计和功能。它的核心特点在于动态创建和重用线程,以提高执行短暂异步任务的程序性能。此池允许在先前构造的线程可用时重复使用它们,且最大线程数为Integer.MAX_VALUE,意味着资源使用相对灵活。
在newCachedThreadPool中,线程的存活时间设置为秒,超过此时间未使用的线程将被终止并从池中移除。这一特性有助于避免资源浪费,保持空闲时间足够长的池不会消耗任何资源。此外,新线程池不包含核心线程,其操作基于SynchronousQueue队列,确保线程间高效同步。
使用newCachedThreadPool时,程序执行到大约秒后自动终止,因为线程池已完成所有任务。存活线程在超过秒的闲置后被终止和移除,这体现了其设计原理。
为何newCachedThreadPool选择SynchronousQueue而不是其他线程池通常采用的LinkedBlockQueue?SynchronousQueue是一个特殊的阻塞队列,旨在实现线程间高效同步。它没有内部容量,且插入操作需等待相应的删除操作。此特性使其成为切换设计的理想选择,允许线程在需要时安全地传递信息、事件或任务,尤其适用于需要多线程间同步的应用场景。
SynchronousQueue通过实现Collection和Iterator接口支持所有可选方法,包括支持可选的公平性策略。默认情况下,不保证生产者和使用者线程的FIFO顺序访问,但通过将公平性策略设置为true,可以确保按此顺序授予访问权限。
总之,newCachedThreadPool通过动态线程重用和SynchronousQueue的高效同步机制,提供了一种灵活且高效的处理短暂异步任务的方法。其设计旨在优化资源使用,通过在任务完成后的秒内自动清理资源,保持系统性能高效。
如何评价synchronousqueue?
《解读Java源码专栏》深入解析Java核心组件源码,内容覆盖集合、线程、线程池、并发、队列等,旨在剖析设计思想与实现细节,助你轻松应对工作面试。
本文聚焦于Java中的SynchronousQueue阻塞队列源码解析,前文已讲解了ArrayBlockingQueue和LinkedBlockingQueue,那么SynchronousQueue又是基于什么实现的呢?答案是:它基于一种特殊的机制,使得生产者与消费者之间的操作同步进行,互不影响。
SynchronousQueue的用途在于快速执行任务,避免任务积压在队列中。与前文所学的队列不同,它并不具备缓冲功能,而是充当了生产者与消费者之间的桥梁。生产者将元素放入队列后必须等待消费者取走,反之亦然,形成一种同步操作机制。
作为BlockingQueue接口的实现类,SynchronousQueue提供了放数据与取数据的多种方法,以适应不同场景。其底层机制基于Transferer抽象类,进一步细化为栈与队列两种实现方式。
本文详细解析了SynchronousQueue的类结构、初始化方法、栈实现与队列实现等关键点,展示了其基于公平策略的不同构造方法。同时,通过源码分析,深入理解了放数据与弹出数据的具体实现逻辑,以及如何通过工具方法调用底层操作。
放数据方面,包括offer、add、put、offer(e, time, unit)等方法,它们分别根据是否匹配到合适节点而决定插入成功与否,或进行阻塞等待。弹出数据则通过poll、remove、take、poll(time, unit)等方法实现,同样基于匹配节点与否的逻辑进行操作。
此外,文章还涉及了查看数据的peek与element方法,但由于SynchronousQueue不支持查看数据的操作,故返回null。至此,对SynchronousQueue的源码解析全面展开。
总结,本文通过详细解读SynchronousQueue的源码,揭示了其作为阻塞队列的独特机制与功能实现。未来文章将继续深入探讨其他阻塞队列的源码解析,敬请期待。
2025-01-16 11:52
2025-01-16 11:23
2025-01-16 10:45
2025-01-16 10:43
2025-01-16 10:15
2025-01-16 09:47