1.ForkjoinPool -1
2.java线程池(一):java线程池基本使用及Executors
3.yarn源码分析(四)AppMaster启动
4.Java原理系列ScheduledThreadPoolExecutor原理用法示例源码详解
ForkjoinPool -1
ForkJoinæ¯ç¨äºå¹¶è¡æ§è¡ä»»å¡çæ¡æ¶ï¼ æ¯ä¸ä¸ªæ大任å¡åå²æè¥å¹²ä¸ªå°ä»»å¡ï¼æç»æ±æ»æ¯ä¸ªå°ä»»å¡ç»æåå¾å°å¤§ä»»å¡ç»æçæ¡æ¶ãForkå°±æ¯æä¸ä¸ªå¤§ä»»å¡åå为è¥å¹²åä»»å¡å¹¶è¡çæ§è¡ï¼Joinå°±æ¯å并è¿äºåä»»å¡çæ§è¡ç»æï¼æåå¾å°è¿ä¸ªå¤§ä»»å¡çç»æãä¸é¢æ¯ä¸ä¸ªæ¯ä¸ä¸ªç®åçJoin/Fork计ç®è¿ç¨ï¼å°1âæ°åç¸å
é常è¿æ ·ä¸ªæ¨¡åï¼ä½ 们ä¼æ³å°ä»ä¹ï¼
Release Framework ï¼ å¸¸è§çå¤ç模åæ¯ä»ä¹ï¼ task pool - worker poolç模åã ä½æ¯Forkjoinpool éåäºå®å ¨ä¸åç模åã
ForkJoinPoolä¸ç§ExecutorServiceçå®ç°ï¼è¿è¡ForkJoinTaskä»»å¡ãForkJoinPoolåºå«äºå ¶å®ExecutorServiceï¼ä¸»è¦æ¯å 为å®éç¨äºä¸ç§å·¥ä½çªå(work-stealing)çæºå¶ãææ被ForkJoinPool管çç线ç¨å°è¯çªåæ交å°æ± åéçä»»å¡æ¥æ§è¡ï¼æ§è¡ä¸åå¯äº§çåä»»å¡æ交å°æ± åä¸ã
ForkJoinPoolç»´æ¤äºä¸ä¸ªWorkQueueçæ°ç»(æ°ç»é¿åº¦æ¯2çæ´æ°æ¬¡æ¹ï¼èªå¨å¢é¿)ãæ¯ä¸ªworkQueueé½æä»»å¡éå(ForkJoinTaskçæ°ç»)ï¼å¹¶ä¸ç¨baseãtopæåä»»å¡éåéå°¾åé头ãwork-stealingæºå¶å°±æ¯å·¥ä½çº¿ç¨æ¨ä¸ªæ«æä»»å¡éåï¼å¦æéåä¸ä¸ºç©ºååéå°¾çä»»å¡å¹¶æ§è¡ã示æå¾å¦ä¸
æµç¨å¾ï¼
poolå±æ§
workQueuesæ¯poolçå±æ§ï¼å®æ¯WorkQueueç±»åçæ°ç»ãexternalPushåexternalSubmitæå建çworkQueue没æowner(å³ä¸æ¯worker)ï¼ä¸ä¼è¢«æ¾å°workQueuesçå¶æ°ä½ç½®ï¼ècreateWorkerå建çworkQueueï¼å³workerï¼æownerï¼ä¸ä¼è¢«æ¾å°workQueuesçå¥æ°ä½ç½®ã
WorkQueueçå 个éè¦æååé说æå¦ä¸ï¼
è¿æ¯WorkQueueçconfigï¼é«ä½è·poolçconfigå¼ä¿æä¸è´ï¼èä½ä½åæ¯workQueueå¨workQueuesæ°ç»çä½ç½®ã
ä»workQueueså±æ§çä»ç»ä¸ï¼æ们ç¥éï¼ä¸æ¯ææworkQueueé½æworkerï¼æ²¡æworkerçworkQueueç§°ä¸ºå ¬å ±éåï¼shared queueï¼ï¼configç第ä½å°±æ¯ç¨æ¥å¤ææ¯å¦æ¯å ¬å ±éåçãå¨externalSubmitå建工ä½éåæ¶ï¼æï¼
q.config = k | SHARED_QUEUE;
å ¶ä¸qæ¯æ°å建çworkQueueï¼kå°±æ¯qå¨workQueuesæ°ç»ä¸çä½ç½®ï¼SHARED_QUEUE=1<<ï¼æ³¨æè¿éconfig没æä¿çmodeçä¿¡æ¯ã
èå¨registerWorkerä¸ï¼åæ¯è¿æ ·ç»workQueueçconfigèµå¼çï¼
w.config = i | mode;
wæ¯æ°å建çworkQueueï¼iæ¯å ¶å¨workQueuesæ°ç»ä¸çä½ç½®ï¼æ²¡æ设置SHARED_QUEUEæ è®°ä½
scanStateæ¯workQueueçå±æ§ï¼æ¯intç±»åçãscanStateçä½ä½å¯ä»¥ç¨æ¥å®ä½å½åworkerå¤äºworkQueuesæ°ç»çåªä¸ªä½ç½®ãæ¯ä¸ªworkerå¨è¢«å建æ¶ä¼å¨å ¶æé å½æ°ä¸è°ç¨poolçregisterWorkerï¼èregisterWorkerä¼ç»scanStateèµä¸ä¸ªåå§å¼ï¼è¿ä¸ªå¼æ¯å¥æ°ï¼å 为workeræ¯ç±createWorkerå建ï¼å¹¶ä¼è¢«æ¾å°WorkQueuesçå¥æ°ä½ç½®ï¼ècreateWorkerå建workeræ¶ä¼è°ç¨registerWorkerã
ç®è¨ä¹ï¼workerçscanStateåå§å¼æ¯å¥æ°ï¼éworkerçscanstateåå§å¼=INACTIVE=1<<ï¼å°äº0ï¼éworkerçworkQueueå¨externalSubmitä¸å建ï¼ã
å½æ¯æ¬¡è°ç¨signalWorkï¼ætryReleaseï¼å¤éworkeræ¶ï¼workerçé«ä½å°±ä¼å 1
å¦å¤ï¼scanState<0表示workeræªæ¿æ´»ï¼å½workerè°ç¨runtaskæ§è¡ä»»å¡æ¶ï¼scanStateä¼è¢«ç½®ä¸ºå¶æ°ï¼å³è®¾ç½®scanStateçæå³è¾¹ä¸ä½ä¸º0ã
workerä¼ç æ¶ï¼æ¯è¿æ ·åå¨ç
workerçå¤é类似è¿æ ·ï¼
å¨workerä¼ç ç4è¡ä¼ªç ä¸ï¼è®©ctlçä½ä½çå¼å为worker.scanStateï¼è¿æ ·ä¸æ¬¡å°±å¯ä»¥éè¿scanStateå¤é该workerãå¤é该workeræ¶ï¼æ该workerçpreStack设置为ctlä½ä½çå¼ï¼è¿æ ·ä¸ä¸æ¬¡å¤éçworkerå°±æ¯scanStateçäºè¯¥preStackçworkerã
è¿ééè¿preStackä¿åä¸ä¸ä¸ªworkerï¼è¿ä¸ªworkeræ¯å½åworkeræ´æ©å°å¨çå¾ ï¼æ以形æä¸ä¸ªåè¿å åºçæ ã
runStateæ¯intç±»åçå¼ï¼æ§å¶æ´ä¸ªpoolçè¿è¡ç¶æåçå½å¨æï¼æä¸é¢å 个å¼ï¼å¯ä»¥å¥½å 个å¼åæ¶åå¨ï¼ï¼
å¦ærunStateå¼ä¸º0ï¼è¡¨ç¤ºpoolå°æªåå§åã
RSLOCK表示éå®poolï¼å½æ·»å workeråpoolç»æ¢æ¶ï¼å°±è¦ä½¿ç¨RSLOCKéå®æ´ä¸ªpoolãå¦æç±äºrunState被éå®ï¼å¯¼è´å ¶ä»æä½çå¾ runState解éï¼é常ç¨waitè¿è¡çå¾ ï¼ï¼å½runState设置äºRSIGNALï¼è¡¨ç¤ºrunState解éï¼å¹¶éç¥ï¼notifyAllï¼çå¾ çæä½ã
å©ä¸4个å¼é½è·runStateçå½å¨ææå ³ï¼é½å¯ä»¥é¡¾åæä¹ï¼
å½éè¦åæ¢æ¶ï¼è®¾ç½®runStateçSTOPå¼ï¼è¡¨ç¤ºåå¤å ³éï¼è¿æ ·å ¶ä»æä½çå°è¿ä¸ªæ è®°ä½ï¼å°±ä¸ä¼ç»§ç»æä½ï¼æ¯å¦tryAddWorkerçå°STOPå°±ä¸ä¼åå建workerï¼
ètryTerminate对è¿äºçå½å¨æç¶æçå¤çåæ¯è¿æ ·çï¼
å½åtopåbaseçåå§å¼ä¸º INITIAL_QUEUE_CAPACITY >>>1= (1 << )>>>1 = /2ãç¶åpushä¸ä¸ªtaskä¹åï¼top+=1ï¼ä¹å°±æ¯è¯´ï¼top对åºçä½ç½®æ¯æ²¡ætaskçï¼æè¿pushè¿æ¥çtaskå¨top-1çä½ç½®ãèbaseçä½ç½®åè½å¯¹åºå°taskï¼base对åºæå æ¾è¿éåçtaskï¼top-1对åºæåæ¾è¿éåçtaskã
qlockå¼å«ä¹ï¼1: locked, < 0: terminate; else 0
å³å½qlockå¼ä½0æ¶ï¼å¯ä»¥æ£å¸¸æä½ï¼å¼=1æ¶ï¼è¡¨ç¤ºéå®
int SQMASK=0xeï¼åä»»ä½æ´æ°è·SQMASKä½ä¸åï¼å¾å°çæ°å°±æ¯å¶æ°ã
è¯æï¼
注æè¿éå为äºè¿å¶æ¯ ï¼å°¤å ¶æ³¨ææå³è¾¹ç¬¬ä¸ä½æ¯0ï¼ä»»ä½æ°è·æå³è¾¹ç¬¬ä¸ä½æ¯0çæ°ä½ä¸åï¼å¾å°çæ°å°±æ¯å¶æ°ï¼å 为ä½ä¸ä¹åï¼ç¬¬ä¸ä½å°±æ¯0ï¼æ¯å¦s=A&SQMASKï¼Aå¯ä»¥æ¯ä»»ææ´æ°ï¼ç¶åæsæäºè¿å¶è¿è¡å¤é¡¹å¼å±å¼ï¼åæs=2 n1+2 n2 â¦â¦+2^nnï¼è¿énâ¥1ï¼æ以så¯ä»¥è¢«2æ´é¤ï¼å³sæ¯å¶æ°ã
æ以ä¸ä¸ªæ°æ¯å¥æ°è¿æ¯å¶æ°ï¼çå ¶æå³è¾¹ç¬¬ä¸ä½å³å¯ã
æ们ç¥éworkQueueæexternalPushå建çåcreateWorkerå建çworkerï¼ä¸¤ç§æ¹å¼å建çworkQueueï¼å ¶æ¾ç½®å°workQueuesçä½ç½®æ¯ä¸åçï¼åè æ¾å°workQueueçå¶æ°ä½ç½®ï¼èåè åæ¾å°å¥æ°ä½ç½®ãä¸åworkQueueæ¾å°èªå·±å¨workQueuesçä½ç½®çç®æ³æç¹ä¸åã
ä¸é¢çä¸ä¸forkjoinæ¡æ¶è·åworkQueuesä¸çå¶æ°ä½ç½®çworkQueueçç®æ³ï¼
è¿æ ·å°±è½è·åworkQueuesçå¶æ°ä½ç½®çworkQueueãmä¿è¯m & r & SQMASKè¿æ´ä¸ªè¿ç®ç»æä¸ä¼è¶ åºworkQueuesçä¸æ ï¼SQMASKä¿è¯åå°çæ¯å¶æ°ä½ç½®çworkQueueãè¿éæä¸ä¸ªæ趣çç°è±¡ï¼å设0å°workQueues.length-1ä¹é´æn个å¶æ°ï¼m & r & SQMASKæ¯æ¬¡é½è½åå°å ¶ä¸ä¸ä¸ªå¶æ°ï¼èä¸è¿ç»n次åå°çå¶æ°ä¸ä¼åºç°éå¤å¼ï¼æ£åæ§é常好ãèä¸æ¯å¾ªç¯çï¼å³1å°n次ån个ä¸åå¶æ°ï¼n+1å°2nä¹æ¯ån次ä¸åå¶æ°ï¼æ¤æ¶n个å¶æ°æ¯ä¸ªé½è¢«éæ°åä¸æ¬¡ãä¸é¢åæä¸rå¼æä»ä¹ç§å¯ï¼ä¸ºä½è½ä¿è¯è¿æ ·çæ£åæ§
ThreadLocalRandomå æä¸å¸¸éPROBE_INCREMENT = 0x9eb9ï¼ä»¥åä¸ä¸ªéæçprobeGenerator =new AtomicInteger() ï¼ç¶åæ¯ä¸ªçº¿ç¨çprobe= probeGenerator.addAndGet(PROBE_INCREMENT)æ以第ä¸ä¸ªçº¿ç¨çprobeå¼æ¯0x9eb9ï¼ç¬¬äºä¸ªçº¿ç¨çå¼å°±æ¯0x9eb9+0x9eb9ï¼ç¬¬ä¸ä¸ªçº¿ç¨çå¼å°±æ¯0x9eb9+0x9eb9+0x9eb9以æ¤ç±»æ¨ï¼æ´ä¸ªå¼æ¯çº¿æ§çï¼å¯ä»¥ç¨y=kx表示ï¼å ¶ä¸k=0x9eb9ï¼x表示第å 个线ç¨ãè¿æ ·æ¯ä¸ªçº¿ç¨çprobeå¯ä»¥ä¿è¯ä¸ä¸æ ·ï¼èä¸å ·æå¾å¥½ç离æ£æ§ã
å®é ä¸ï¼å¯ä»¥ä¸ç¨0x9eb9è¿ä¸ªå¼ï¼ç¨ä»»æä¸ä¸ªå¥æ°é½æ¯å¯ä»¥çï¼æ¯å¦1ãå¦æç¨1çè¯ï¼probe+=1ï¼è¿æ ·æ¯ä¸ªçº¿ç¨çprobeå°±é½æ¯ä¸åçï¼èä¸å ·æå¾å¥½ç离æ£æ§ãä¹å°±æ¯è¯´ï¼å设æéå¶æ¡ä»¶probe<nï¼è¶ è¿nå产ç溢åºãåprobeèªå n次åæä¼å¼å§åºç°éå¤å¼ï¼n次åprobeæ¯æ¬¡èªå çå¼é½ä¸åãå®é ä¸ç¨ä»»æä¸ä¸ªå¥æ°ï¼é½å¯ä»¥ä¿è¯probeèªå n次åæä¼å¼å§åºç°éå¤å¼ï¼æå ´è¶£å¯çæ¬ææåéå½é¨åãç±äºå¥æ°ç离æ£æ§ï¼æ以åªè¦çº¿ç¨æ°å°äºmæè SQMASK两è ä¸çæå°å¼ï¼åæ¯ä¸ªçº¿ç¨é½è½å¯ä¸å°å æ®ä¸ä¸ªwsä¸çä¸ä¸ªä½ç½®
å½ä¸ä¸ªæä½æ¯å¨éForkjoinThreadç线ç¨ä¸è¿è¡çï¼å称该æä½ä¸ºå¤é¨æä½ãæ¯å¦æ们åé¢æ§è¡pool.invokeï¼invokeå åæ§è¡externalPushãç±äºinvokeæ¯å¨éForkjoinThread线ç¨ä¸è¿è¡çï¼è¿éæ¯å¨main线ç¨ä¸è¿è¡ï¼ï¼æ以æ¯ä¸ä¸ªå¤é¨æä½ï¼è°ç¨çæ¯externalPushãä¹åtaskçæ§è¡æ¯éè¿ForkJoinThreadæ¥æ§è¡çï¼æ以taskä¸çforkå°±æ¯å é¨æä½ï¼è°ç¨çæ¯pushï¼æä»»å¡æ交å°å·¥ä½éåãå ¶å®forkçå®ç°æ¯ç±»ä¼¼ä¸é¢è¿æ ·çï¼
å³forkä¼æ ¹æ®æ§è¡èªèº«ç线ç¨æ¯å¦æ¯ForkJoinThreadçå®ä¾æ¥å¤ææ¯å¤äºå¤é¨è¿æ¯å é¨ãé£ä¸ºä½è¦åºåå å¤é¨ï¼
ä»»ä½çº¿ç¨é½å¯ä»¥ä½¿ç¨ForkJoinæ¡æ¶ï¼ä½æ¯å¯¹äºéForkJoinThreadç线ç¨ï¼å®å°åºæ¯ææ ·çï¼ForkJoinæ æ³æ§å¶ï¼ä¹æ æ³å¯¹å ¶ä¼åãå æ¤åºååºå å¤é¨ï¼è¿æ ·æ¹ä¾¿ForkJoinæ¡æ¶å¯¹ä»»å¡çæ§è¡è¿è¡æ§å¶åä¼å
forkJoinPool.invoke(task)æ¯æä»»å¡æ¾å ¥å·¥ä½éåï¼å¹¶çå¾ ä»»å¡æ§è¡ãæºç å¦ä¸
è¿éexternalPushè´è´£ä»»å¡æ交ï¼externalPushæºç å¦ä¸ï¼
java线程池(一):java线程池基本使用及Executors
@[toc] 在前面学习线程组的时候就提到过线程池。实际上线程组在我们的日常工作中已经不太会用到,但是线程池恰恰相反,是我们日常工作中必不可少的工具之一。现在开始对线程池的使用,以及底层ThreadPoolExecutor的精典登录的源码源码进行分析。1.为什么需要线程池我们在前面对线程基础以及线程的生命周期有过详细介绍。一个基本的常识就是,线程是一个特殊的对象,其底层是依赖于JVM的native方法,在jvm虚拟机内部实现的。线程与普通对象不一样的地方在于,除了需要在堆上分配对象之外,还需要给每个线程分配一个线程栈、以及本地方法栈、程序计数器等线程的私有空间。线程的初始化工作相对于线程执行的大多数任务而言,都是一个耗时比较长的工作。这与数据库使用一样。有时候我们连接数据库,仅仅只是为了执行一条很小的sql语句。但是在我们日常的开发工作中,我们的rocketmq源码教学绝大部分工作内容,都会分解为一个个短小的执行任务来执行。这样才能更加合理的复用资源。这种思想就与我们之前提到的协程一样。任务要尽可能的小。但是在java中,任务不可能像协程那样拆分得那么细。那么试想,如果说,有一个已经初始化好的很多线程,在随时待命,那么当我们有任务提交的时候,这些线程就可以立即工作,无缝接管我们的任务请求。那么效率就会大大增加。这些个线程可以处理任何任务。这样一来我们就把实际的任务与线程本身进行了解耦。从而将这些线程实现了复用。 这种复用的一次创建,可以重复使用的池化的线程对象就被成为线程池。 在线程池中,我们的线程是可以复用的,不用每次都创建一个新的求源码指标线程。减少了创建和销毁线程的时间开销。 同时,线程池还具有队列缓冲策略,拒绝机制和动态线程管理。可以实现线程环境的隔离。当一个线程有问题的时候,也不会对其他的线程造成影响。 以上就是我们使用线程池的原因。一句话来概括就是资源复用,降低开销。
2.java中线程池的实现在java中,线程池的主要接口是Executor和ExecutorService在这两个接口中分别对线程池的行为进行了约束,最主要的是在ExecutorService。之后,线程池的实际实现类是AbstractExecutorService类。这个类有三个主要的实现类,ThreadpoolExecutorService、ForkJoinPool以及DelegatedExecutorService。
后面我们将对这三种最主要的实现类的源码以及实现机制进行分析。
3.创建线程的工厂方法Executors在java中, 已经给我们提供了创建线程池的工厂方法类Executors。通过这个类以静态方法的付呗源码模式可以为我们创建大多数线程池。Executors提供了5种创建线程池的方式,我们先来看看这个类提供的工厂方法。
3.1 newFixedThreadPool/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue.At any point, at most * { @code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks.The threads in the pool will exist * until it is explicitly { @link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if { @code nThreads <= 0} */public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}这个方法能够创建一个固定线程数量的无界队列的线程池。参数nthreads是最多可同时处理的活动的线程数。如果在所有线程都在处理任务的情况下,提交了其他的任务,那么这些任务将处于等待队列中。直到有一个线程可用为止。如果任何线程在关闭之前的执行过程中,由于失败而终止,则需要在执行后续任务的时候,创建一个新的线程来替换。线程池中的所有线程都将一直存在,直到显示的调用了shutdown方法。 上述方法能创建一个固定线程数量的线程池。内部默认的是使用LinkedBlockingQueue。但是需要注意的是,这个LinkedBlockingQueue底层是链表结构,其允许的最大队列长度为Integer.MAX_VALUE。
public LinkedBlockingQueue() { this(Integer.MAX_VALUE);}这样在使用的过程中如果我们没有很好的控制,那么就可能导致内存溢出,出现OOM异常。pdf源码推荐因此这种方式实际上已经不被提倡。我们在使用的过程中应该谨慎使用。 newFixedThreadPool(int nThreads, ThreadFactory threadFactory)方法:
/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue, using the provided * ThreadFactory to create new threads when needed.At any point, * at most { @code nThreads} threads will be active processing * tasks.If additional tasks are submitted when all threads are * active, they will wait in the queue until a thread is * available.If any thread terminates due to a failure during * execution prior to shutdown, a new one will take its place if * needed to execute subsequent tasks.The threads in the pool will * exist until it is explicitly { @link ExecutorService#shutdown * shutdown}. * * @param nThreads the number of threads in the pool * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null * @throws IllegalArgumentException if { @code nThreads <= 0} */public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);}这个方法与3.1中newFixedThreadPool(int nThreads)的方法的唯一区别就是,增加了threadFactory参数。在前面方法中,对于线程的创建是采用的默认实现Executors.defaultThreadFactory()。而在此方法中,可以根据需要自行定制。
3.2 newSingleThreadExecutor/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.)Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * { @code newFixedThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created single-threaded Executor */public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}此方法将会创建指有一个线程和一个无届队列的线程池。需要注意的是,如果这个执行线程在执行过程中由于失败而终止,那么需要在执行后续任务的时候,用一个新的线程来替换。 那么这样一来,上述线程池就能确保任务的顺序性,并且在任何时间都不会有多个线程处于活动状态。与newFixedThreadPool(1)不同的是,使用newSingleThreadExecutor返回的ExecutorService不能被重新分配线程数量。而使用newFixExecutor(1)返回的ExecutorService,其活动的线程的数量可以重新分配。后面专门对这个问题进行详细分析。 newSingleThreadExecutor(ThreadFactory threadFactory) 方法:
/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue, and uses the provided ThreadFactory to * create a new thread when needed. Unlike the otherwise * equivalent { @code newFixedThreadPool(1, threadFactory)} the * returned executor is guaranteed not to be reconfigurable to use * additional threads. * * @param threadFactory the factory to use when creating new * threads * * @return the newly created single-threaded Executor * @throws NullPointerException if threadFactory is null */public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));}这个方法与3.3中newSingleThreadExecutor的区别就在于增加了一个threadFactory。可以自定义创建线程的方法。
3.3 newCachedThreadPool/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available.These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to { @code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using { @link ThreadPoolExecutor} constructors. * * @return the newly created thread pool */public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}这个方法用来创建一个线程池,该线程池可以根据需要自动增加线程。以前的线程也可以复用。这个线程池通常可以提高很多执行周期短的异步任务的性能。对于execute将重用以前的构造线程。如果没有可用的线程,就创建一个 新的线程添加到pool中。秒内,如果该线程没有被使用,则该线程将会终止,并从缓存中删除。因此,在足够长的时间内,这个线程池不会消耗任何资源。可以使用ThreadPoolExecutor构造函数创建具有类似属性但是详细信息不同的线程池。 ?需要注意的是,这个方法创建的线程池,虽然队列的长度可控,但是线程的数量的范围是Integer.MAX_VALUE。这样的话,如果使用不当,同样存在OOM的风险。比如说,我们使用的每个任务的耗时比较长,任务的请求又非常快,那么这样势必会造成在单位时间内创建了大量的线程。从而造成内存溢出。 newCachedThreadPool(ThreadFactory threadFactory)方法:
/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available, and uses the provided * ThreadFactory to create new threads when needed. * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null */public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);}这个方法区别同样也是在于,增加了threadFactory可以自行指定线程的创建方式。
2.4 newScheduledThreadPool/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @return a newly created scheduled thread pool * @throws IllegalArgumentException if { @code corePoolSize < 0} */public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}创建一个线程池,该线程池可以将任务在指定的延迟时间之后运行。或者定期运行。这个方法返回的是ScheduledThreadPoolExecutor。这个类是ThreadPoolExecutor的子类。在原有线程池的的基础之上,增加了延迟和定时功能。我们在后面分析了ThreadPoolExecutor源码之后,再来分析这个类的源码。 与之类似的方法:
/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @param threadFactory the factory to use when the executor * creates a new thread * @return a newly created scheduled thread pool * @throws IllegalArgumentException if { @code corePoolSize < 0} * @throws NullPointerException if threadFactory is null */public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);}通过这个方法,我们可以指定threadFactory。自定义线程创建的方式。 同样,我们还可以只指定一个线程:
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));}public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory));}上述两个方法都可以实现这个功能,但是需要注意的是,这两个方法的返回在外层包裹了一个包装类。
3.5 newWorkStealingPool这种方式是在jdk1.8之后新增的。我们先来看看其源码:
public LinkedBlockingQueue() { this(Integer.MAX_VALUE);}0这个方法实际上返回的是ForkJoinPool。该方法创建了一
yarn源码分析(四)AppMaster启动
在容器分配完成之后,启动容器的代码主要在ContainerImpl.java中进行。通过状态机转换,container从NEW状态向其他状态转移时,会调用RequestResourceTransition对象。RequestResourceTransition负责将所需的资源进行本地化,或者避免资源本地化。若需本地化,还需过渡到LOCALIZING状态。为简化理解,此处仅关注是否进行资源本地化的情况。
为了将LAUNCH_CONTAINER事件加入事件处理队列,调用了sendLaunchEvent方法。该事件由ContainersLauncher负责处理。ContainersLauncher的handle方法中,使用一个ExecutorService(线程池)容器Launcher。ContainerLaunch实现了Callable接口,其call方法生成并执行launch_container脚本。以MapReduce框架为例,该脚本在hadoop.tmp.dir/application name/container name目录下生成,其主要作用是启动MRAppMaster进程,即MapReduce的ApplicationMaster。
Java原理系列ScheduledThreadPoolExecutor原理用法示例源码详解
ScheduledThreadPoolExecutor是Java中实现定时任务与周期性执行任务的高效工具。它继承自ThreadPoolExecutor类,能够提供比常规Timer类更强大的灵活性与功能,特别是在需要多个工作线程或有特殊调度需求的场景下。
该类主要功能包含但不限于提交在指定延迟后执行的任务,以及按照固定间隔周期执行的任务。它实现了ScheduledExecutorService接口,进而提供了丰富的API以实现任务的调度与管理。其中包括now()、getDelay()、compareTo()等方法,帮助开发者更精确地处理任务调度与延迟。
在实际应用中,ScheduledThreadPoolExecutor的使用案例广泛。比如,初始化一个ScheduledThreadPoolExecutor实例,设置核心线程数,从而为定时任务提供资源保障。提交延迟任务,例如在5秒后执行特定操作,并输出相关信息。此外,提交周期性任务,如每隔2秒执行一次特定操作,用于实时监控或数据更新。最后,通过调用shutdown()与shutdownNow()方法来关闭执行器并等待所有任务完成,确保系统资源的合理释放与任务的有序结束。
总的来说,ScheduledThreadPoolExecutor在处理需要精确时间控制的任务时展现出了强大的功能与灵活性,是Java开发者在实现定时与周期性任务时的首选工具。