1.python线程池(ThreadPoolExecutor)与进程池(ProcessPoolExecutor)的源码简单用法
2.java线程池之ScheduledThreadPoolExecutor实现原理
3.Python 线程池 (thread pool) 创建及使用 + 实例代码
4.ThreadPoolExecutor简介&源码解析
5.手写一个线程池,带你学习ThreadPoolExecutor线程池实现原理
python线程池(ThreadPoolExecutor)与进程池(ProcessPoolExecutor)的源码简单用法
Python中的ThreadPoolExecutor和ProcessPoolExecutor,均来自concurrent.futures模块,源码它们允许主线程监控子线程或子进程的源码状态和任务结果。submit方法返回Future对象,源码用于跟踪任务进度和状态。源码前一选股 源码
ThreadPoolExecutor下,源码初始时四个任务都处于未完成状态。源码2.5秒后,源码task1和task2完成,源码而task3和task4由于sleep,源码状态未变。源码wait方法接受任务序列、源码超时时间和返回条件,源码如在2.5秒后,源码task1和task2已完成,但task3和task4仍在执行。
map函数则用于按顺序返回线程执行结果,如果超时,会抛出TimeoutError。as_completed则返回已完成的任务,其返回顺序与任务执行结束的顺序一致。
对于需要充分利用多核CPU的场景,如频繁的CPU操作,由于GIL(全局解释器锁)的影响,ProcessPoolExecutor相较于ThreadPoolExecutor具有更高的客源拓客源码执行效率。在处理斐波拉切等计算密集型任务时,进程池的3.3秒完成时间小于线程池的4.9秒,体现出进程的优势。
在实际使用上,ProcessPoolExecutor与ThreadPoolExecutor在futures模块中的方法基本相似,但ProcessPoolExecutor在map方法中额外提供了chunksize参数,用于分割大型迭代对象,以提高性能。
java线程池之ScheduledThreadPoolExecutor实现原理
java中异步周期任务调度有Timer,ScheduledThreadPoolExecutor等实现,目前单机版的定时调度都是使用ScheduledThreadPoolExecutor去实现,那么它是如何实现周期执行任务的呢?其实它还是利用ThreadPoolExecutor线程池去执行任务,这一点从它是继承自ThreadPoolExecutor救可以看的出来,其实关键在于如何实现任务的周期性调度,ScheduledThreadPoolExecutor类以及核心函数首先ScheduledThreadPoolExecutor是实现ScheduledExecutorService接口,它主要定义了四个方法:
周期调度一个Runnable的对象
周期调度一个Callable的对象
固定周期调度Runnable对象 (不管上一次Runnable执行结束的时间,总是以固定延迟时间执行 即 上一个Runnable执行开始时候 + 延时时间 = 下一个Runnable执行的时间点)
以固定延迟调度unnable对象(当上一个Runnable执行结束后+固定延迟 = 下一个Runnable执行的时间点)
publicinterfaceScheduledExecutorServiceextendsExecutorService{ publicScheduledFuture<?>schedule(Runnablecommand,longdelay,TimeUnitunit);public<V>ScheduledFuture<V>schedule(Callable<V>callable,longdelay,TimeUnitunit);publicScheduledFuture<?>scheduleAtFixedRate(Runnablecommand,longinitialDelay,longperiod,TimeUnitunit);publicScheduledFuture<?>scheduleWithFixedDelay(Runnablecommand,longinitialDelay,longdelay,TimeUnitunit);}其次,ScheduledThreadPoolExecutor是继承ThreadPoolExecutor,所以它是借助线程池的能力去执行任务,然后自身去实现周期性调度。从构造方法调用父类的线程池的构造方法,核心线程数是构造方法传入,这里可以看到最大线程数是Integer的最大值即, 还有等待队列是DelayedWorkQueue,它是实现延时的关键.
/***Createsanew{ @codeScheduledThreadPoolExecutor}withthe*givencorepoolsize.**@paramcorePoolSizethenumberofthreadstokeepinthepool,even*iftheyareidle,unless{ @codeallowCoreThreadTimeOut}isset*@throwsIllegalArgumentExceptionif{ @codecorePoolSize<0}*/publicScheduledThreadPoolExecutor(intcorePoolSize){ super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}scheduleAtFixedRate是实现周期性调度的方法,调度任务就是实现Runnable对象, 以及系统的开始延时时间,周期的调度的间隔时间。
计算初始触发时间和执行周期,量化走势线源码并和传入的Runnable对象作为参数封装成 ScheduledFutureTask,然后调用decorateTask装饰Tas(默认实现为空)。
设置ScheduledFutureTask对象outerTask为t(默认就是它自己)。
调用delayedExecute延迟执行任务。
publicScheduledFuture<?>scheduleAtFixedRate(Runnablecommand,longinitialDelay,longperiod,**TimeUnitunit){ if(command==null||unit==null)thrownewNullPointerException();if(period<=0)thrownewIllegalArgumentException();ScheduledFutureTask<Void>sft=newScheduledFutureTask<Void>(command,null,triggerTime(initialDelay,unit),unit.toNanos(period));RunnableScheduledFuture<Void>t=decorateTask(command,sft);sft.outerTask=t;delayedExecute(t);returnt;}判断线程池状态,如果不是处于running状态,则拒绝该任务。
将该任务加入父类的延迟队列(实际为初始化的DelayedWorkQueue对象)
再次判断线程池不是处于running状态,并且,判断是否是处于shutdown状态并且continueExistingPeriodicTasksAfterShutdown标志是否是true(默认是false,表示是否线程次处于shutdown状态下是否继续执行周期性任务),若果为true,则从队列删除任务,false,则确保启动线程来执行周期性任务
privatevoiddelayedExecute(RunnableScheduledFuture<?>task){ if(isShutdown())reject(task);else{ super.getQueue().add(task);if(isShutdown()&&!canRunInCurrentRunState(task.isPeriodic())&&remove(task))task.cancel(false);elseensurePrestart();}}获取线程池数量
如果小于核心线程数,则启动核心线程执行任务,如果线程数为空,则启动非核心线程
voidensurePrestart(){ intwc=workerCountOf(ctl.get());if(wc<corePoolSize)addWorker(null,true);elseif(wc==0)addWorker(null,false);}ScheduledFutureTask的run函数获取是否是周期性任务
判断是否线程池状态是否可以执行任务,如果为true,则取消任务 3 如果是非周期性任务,则直接调用父类FutureTask的run方法, 4 如果是周期性任务,则调用FutureTask的runAndReset函数, 如果该函数返回为true,则调用setNextRunTime设置下一次运行的时间, 并且还行reExecutePeriodic再次执行周期性任务。
publicvoidrun(){ booleanperiodic=isPeriodic();if(!canRunInCurrentRunState(periodic))cancel(false);elseif(!periodic)ScheduledFutureTask.super.run();elseif(ScheduledFutureTask.super.runAndReset()){ setNextRunTime();reExecutePeriodic(outerTask);}}判断线程池是否处于可执行任务的状态,如果为true,视频桌面软件源码则重新将设置下一次运行时间的任务加入父类的等待队列,
如果线程池处于不可运行任务的状态,则并且从等待队列中移除成功, 调用任务的取消操作,否则调用ensurePrestart确保启动线程执行任务
voidreExecutePeriodic(RunnableScheduledFuture<?>task){ if(canRunInCurrentRunState(true)){ super.getQueue().add(task);if(!canRunInCurrentRunState(true)&&remove(task))task.cancel(false);elseensurePrestart();}}DelayedWorkQueue类核心函数DelayedWorkQueue是继承AbstractQueue,并实现BlockingQueue接口
staticclassDelayedWorkQueueextendsAbstractQueue<Runnable>implementsBlockingQueue<Runnable>{核心字段
//初始容量为privatestaticfinalintINITIAL_CAPACITY=;//等待队列,只能保存RunnableScheduledFuture对象privateRunnableScheduledFuture<?>[]queue=newRunnableScheduledFuture<?>[INITIAL_CAPACITY];//锁privatefinalReentrantLocklock=newReentrantLock();//对俄大小privateintsize=0;//leader线程,表示最近需要执行的任务的线程。privateThreadleader=null;//条件锁privatefinalConditionavailable=lock.newCondition();offer函数:
将添加的参数转换成RunnableScheduledFuture对象。
加全局锁。
获取当前队列的size,如果等于队列的长度,则嗲用grow扩容,增加%的数组长度。
size加1。
如果数组为0,则将加入的对象放在索引为0的位置,然后设置ScheduledFutureTask的heapIndex的索引(便于后续快速删除)。
调用siftUp做堆的上浮操作,这里是小根堆的操作。
如果队列中第一个元素是传入的对象,则将laader设置null
释放锁
返回true
publicbooleanoffer(Runnablex){ if(x==null)thrownewNullPointerException();RunnableScheduledFuture<?>e=(RunnableScheduledFuture<?>)x;finalReentrantLocklock=this.lock;lock.lock();try{ inti=size;if(i>=queue.length)grow();size=i+1;if(i==0){ queue[0]=e;setIndex(e,0);}else{ siftUp(i,e);}if(queue[0]==e){ leader=null;available.signal();}}finally{ lock.unlock();}returntrue;}siftUp主要就是做小根堆的上移操作,从if (key.compareTo(e) >= 0) 看出,如果key大于parent索引的元素,则停止。源码下载怎么使用
/***Createsanew{ @codeScheduledThreadPoolExecutor}withthe*givencorepoolsize.**@paramcorePoolSizethenumberofthreadstokeepinthepool,even*iftheyareidle,unless{ @codeallowCoreThreadTimeOut}isset*@throwsIllegalArgumentExceptionif{ @codecorePoolSize<0}*/publicScheduledThreadPoolExecutor(intcorePoolSize){ super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}0poll函数
加锁
获取队列中索引为0的云元素,若果为null或者第一个元素的执行时间戳时间大于当前时间则直接返回null,否则调用finishPoll将第一个元素返回.
释放锁
/***Createsanew{ @codeScheduledThreadPoolExecutor}withthe*givencorepoolsize.**@paramcorePoolSizethenumberofthreadstokeepinthepool,even*iftheyareidle,unless{ @codeallowCoreThreadTimeOut}isset*@throwsIllegalArgumentExceptionif{ @codecorePoolSize<0}*/publicScheduledThreadPoolExecutor(intcorePoolSize){ super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}1将队列size 减 1
获取队列中队列中最后一个元素,并且设置队列最后一个为null
最后一个元素不为null,则调用sfitdown进行,将最后一个元素设置到索引为0的位置,将下移操作,重新调整小根堆。
ScheduledFutureTask的heapIndex为-1
/***Createsanew{ @codeScheduledThreadPoolExecutor}withthe*givencorepoolsize.**@paramcorePoolSizethenumberofthreadstokeepinthepool,even*iftheyareidle,unless{ @codeallowCoreThreadTimeOut}isset*@throwsIllegalArgumentExceptionif{ @codecorePoolSize<0}*/publicScheduledThreadPoolExecutor(intcorePoolSize){ super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}2ScheduledFutureTask的compareTo函数ScheduledFutureTask实现compareTo方法逻辑
首先比较是否是同一个对象
若果是ScheduledFutureTask对象,则比较time的大小,time是下一次执行的任务的时间戳,如果不是,则比较 getDelay的时间大小
/***Createsanew{ @codeScheduledThreadPoolExecutor}withthe*givencorepoolsize.**@paramcorePoolSizethenumberofthreadstokeepinthepool,even*iftheyareidle,unless{ @codeallowCoreThreadTimeOut}isset*@throwsIllegalArgumentExceptionif{ @codecorePoolSize<0}*/publicScheduledThreadPoolExecutor(intcorePoolSize){ super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}3ScheduledThreadPoolExecutor的take函数就是ThreadPoolExecutor的从任务队列中获取任务,没有任务则一直等待(这里是线程数小于核心线程数的情况)
加可中断锁
获取队列中第一个元素的任务,从前面可以知道此任务执行的时间戳最小的任务
如果第一个任务为空,则再全局的锁的条件锁上等待,
如果第一个任务不为空,则获取延迟时间,如果延时时间小于0,说明第一个任务已经到时间了,则返回第一个任务。
如果leader线程不为空,则让线程在全局锁的条件锁上等待
如果leader为空,则将获取第一个任务的当前线程赋值为leader变量。
在全局锁的条件锁上等待delay纳秒, 等待结束后,如果当前线程还是等于leader线程,则重置leader为空
最后判断 leader为空并且第一个任务不为空,则唤醒全局锁上条件锁的等待的线程。
释放全局锁。
/***Createsanew{ @codeScheduledThreadPoolExecutor}withthe*givencorepoolsize.**@paramcorePoolSizethenumberofthreadstokeepinthepool,even*iftheyareidle,unless{ @codeallowCoreThreadTimeOut}isset*@throwsIllegalArgumentExceptionif{ @codecorePoolSize<0}*/publicScheduledThreadPoolExecutor(intcorePoolSize){ super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}4总结\ 综合前面所述,线程池从DelayedWorkQueue每次取出的任务就是延迟时间最小的任务, 若果到达时间的任务,则执行任务,否则则用条件锁Conditon的wait进行等待,执行完后,则用signal进行唤醒下一个任务的执行。
Python 线程池 (thread pool) 创建及使用 + 实例代码
Python中的线程池是处理高并发问题的重要工具,它通过管理一组线程来提高效率,避免频繁创建和销毁线程带来的开销。线程池的基本概念是维护一定数量的线程,等待分配给任务,确保资源的有效利用。创建线程池的目的是为了在处理大量请求时,控制线程数量,防止资源过度消耗。
在Python中,可以利用内置的concurrent.futures库中的ThreadPoolExecutor来构建线程池。这个类允许你指定线程池的最大工作线程数,并提供了如submit、result和cancel等方法来管理任务执行。例如,你可以创建一个线程池实例,设置最大线程数为2,然后提交一个任务到线程池中执行。
线程池设计时,关键点包括:预先创建线程数量,任务分发给线程处理,当任务完成后,线程池会自动调度下一个任务。Python的map函数在处理迭代器时,可以并发执行函数,这也是线程池功能的一种体现。
线程池的实现不仅能提升程序性能,还考虑了线程池的伸缩性和资源管理,确保在处理大量请求时,既能充分利用系统资源,又避免了不必要的线程切换开销。通过合理的线程池设计,可以在Python开发中更有效地应对并发挑战。
ThreadPoolExecutor简介&源码解析
线程池是通过池化管理线程的高效工具,尤其在多核CPU时代,利用线程池进行并行处理任务有助于提升服务器性能。ThreadPoolExecutor是线程池的具体实现,它负责线程管理和任务管理,以及处理任务拒绝策略。这个类提供了多种功能,如通过Executors工厂方法配置,执行Runnable和Callable任务,维护任务队列,统计任务完成情况等。
创建线程池需要考虑关键参数,如核心线程数(任务开始执行时立即创建),最大线程数(任务过多时限制新线程生成),线程存活时间,任务队列大小,线程工厂以及拒绝策略。JDK提供了四种拒绝策略,如默认的AbortPolicy,当资源饱和时抛出异常。此外,线程池还提供了beforeExecute和afterExecute钩子函数,用于在任务执行前后执行自定义操作。
当任务提交到线程池时,会经历一系列处理流程,包括任务的执行和线程池状态的管理。例如,如果任务队列和线程池满,会根据拒绝策略处理新任务。使用线程池时,需注意线程池容量与状态的计算,以及线程池工作线程的动态调整。
示例中,自定义线程池并重写钩子函数,创建任务后向线程池提交,可以看到线程池如何根据配置动态调整资源。但要注意,如果任务过多且无法处理,可能会抛出异常。源码分析中,submit方法实际上是调用execute,而execute内部包含Worker类和runWorker方法的逻辑,包括任务的获取和执行。
线程池的容量上限并非Integer.MAX_VALUE,而是由ctl变量的低位决定。 Doug Lea的工具函数简化了ctl的操作,使得计算线程池状态和工作线程数更加便捷。通过深入了解ThreadPoolExecutor,开发者可以更有效地利用线程池提高应用性能。
手写一个线程池,带你学习ThreadPoolExecutor线程池实现原理
本文旨在通过手写一个线程池,来深入理解ThreadPoolExecutor线程池的实现原理。首先,线程池的核心目标是资源管理和性能优化,通过池化技术减少线程创建和销毁的开销。手写线程池的实现步骤包括确定核心流程和添加辅助流程,虽然代码简单,但能体现核心的池化思想。
手写线程池的实现涉及到状态管理,如线程池数量和状态的记录,这部分在ThreadPoolExecutor中通过AtomicInteger的高3位和低位实现。线程池的状态流转包括RUNNING、BLOCKED等,并通过execute方法提交任务,这个过程与我们自己的实现类似,包括任务的执行、加入队列和策略决策。
添加执行任务的过程分为增加线程数量和启动线程,这部分与我们最初的设想基本一致。在runWorker方法中,执行线程的核心是调用task.run(),同时还会涉及队列任务的获取。这些步骤与手写线程池的逻辑相吻合,帮助我们更好地理解线程池的工作机制。
总结来说,通过对比分析和实践,我们对ThreadPoolExecutor的线程池实现有了更深入的理解,包括状态管理、任务提交和执行流程等。深入阅读源码后,你会发现线程池的复杂性和优化设计。如果你对Java线程池感兴趣,这将是一次很好的学习和实践机会。