欢迎来到皮皮网网首页

【hashmap源码 实现原理】【hotween 源码】【介入 源码】线程池队列源码_线程池 队列

来源:高效源码阅读模型 时间:2024-12-28 17:35:49

1.C语言实现高并发服务器上线程池原理(含源码)
2.线程池原理详解及如何用C语言实现线程池
3.C++ 11 实现(同步的线程)任务队列线程池
4.线程池newCachedThreadPool
5.java 线程池 工作队列是如何工作的
6.线程池的四种创建方式及区别

线程池队列源码_线程池 队列

C语言实现高并发服务器上线程池原理(含源码)

       在高并发服务器场景中,线程池作为一种高效的池队程池多线程处理策略,旨在有效利用资源。列源其工作流程通常包括接收消息、码线分类、队列创建线程、线程hashmap源码 实现原理传递任务、池队程池线程执行和任务完成。列源对于小型局域网,码线这种方法足够,队列但在广域网或大型局域网中,线程频繁的池队程池请求可能导致线程频繁创建和销毁,这在内存资源有限的列源嵌入式服务器中尤为关键。

       因此,码线线程池技术应运而生,队列通过复用线程,一个线程可以处理不同任务,避免了频繁创建和销毁的开销。理解线程池的结构十分重要,它由任务队列、线程集合(包括工作线程、空闲线程和待销毁线程)和管理者线程组成。任务队列负责存储待处理任务,以先进先出的方式组织;线程集合则负责执行任务;管理者线程则负责监控线程状态,动态调整线程数量以维持最佳性能。

       线程池的核心结构包括一个threadpool_t结构体,其中包含线程池状态、任务队列信息,以及用于同步操作的互斥锁。任务结构中包含处理函数的指针和相关参数。在使用时,需将分类后的处理函数与参数打包为任务,并放入队列,等待线程执行。

       为了深入学习,你可以参考一些资源,hotween 源码例如加入Linux内核技术交流群,获取学习资料和书籍推荐。而想要在嵌入式开发领域进入互联网大厂,理解并掌握线程池的原理和实现是必不可少的。内核学习网站也是个不错的资源来源。

线程池原理详解及如何用C语言实现线程池

       线程池是一种优化多线程处理形式的机制,特别适用于高并发服务器,能高效管理线程资源。其处理流程为:接收消息、消息分类、线程创建、传递消息到子线程、线程分离、在子线程执行任务、任务结束退出。对于小型网络通信,上述方法通常足够。但在广域网或大型局域网通信中,频繁请求服务器导致大量消息处理,创建和销毁线程成为高昂开销。因此,线程池技术应运而生,允许线程复用,避免每次请求创建新线程的开销。

       线程池内部由任务队列、一组线程和管理者线程组成。任务队列用于存储待处理任务,线程用于执行任务,管理者线程则负责控制线程活动,确保线程资源合理利用。

       实现线程池涉及以下几个关键结构和操作:

       线程池结构详解

       线程池结构体包含:

       线程池状态信息:描述线程池配置,包括最小线程数、最大线程数、存活线程数等。

       任务队列信息:描述任务队列状态,介入 源码包括最大任务数、队列是否满、是否为空等。

       多线程互斥锁:确保线程安全操作,如取任务、修改任务队列信息和线程池状态。

       任务函数指针和参数:用于存储和传递消息处理函数及所需参数。

       线程池初始化函数`threadpool_create`负责分配资源,设置参数,并启动线程。

       线程数组

       线程数组是线程池初始化阶段预先创建的一组线程,用于高效执行任务。这些线程在工作时竞争获取任务,通过互斥锁确保线程安全。

       任务队列

       任务队列用于存储待处理任务。线程池通过任务队列组织任务执行顺序,确保先进先出原则,同时与线程数组协同工作,高效分配任务。

       管理者线程

       管理者线程负责监控线程池状态,动态调整线程数量,以维持高效平衡。它定期检查任务队列和线程状态,根据需要启动或销毁线程。

       线程池接口

       线程池接口提供初始化、添加任务、销毁等操作,便于在实际应用中集成和管理。

       释放资源

       线程池提供资源释放接口,确保线程池安全关闭,避免资源泄漏。

       使用示例

       示例代码展示了如何使用线程池接口创建线程池、添加任务和销毁线程池。

C++ 实现(同步的)任务队列线程池

       在C++ 中,实现同步任务队列线程池是阿帕奇源码一种高效利用多核CPU处理任务的方法。当线程池启动时,线程会等待队列中的任务,一旦任务充足,多个线程便会竞争执行。然而,为了防止内存溢出,队列容量设有限制。线程池的性能与CPU核心数密切相关,推荐设置为核心数加2,过多的线程可能导致线程切换频繁,反而降低效率。

       线程池的活动流程包括任务的生产与消费,通过上篇博文中介绍的同步队列SyncQueue进行管理。生产者线程如thd1持续产生任务,线程池内部负责消费。停止线程池时,只需调用Stop函数即可。同时,使用std::call_once确保特定代码只执行一次,确保并发执行的线程中只有一个活动线程执行,保证数据可见性的一致性。

       std::thread::hardware_concurrency函数提供了获取系统硬件并发线程数的功能,这对于优化线程池大小和资源分配至关重要。如果你是C/C++后端开发或架构师,寻找学习资料和教程,可以加入学习交流群获取免费资源。

       下面是一个直观的线程池实现示例:

       线程池通过C++ 的同步队列SyncQueue管理任务,利用多核CPU提高任务处理效率。队列容量设有限制以防止内存溢出,推荐线程数设置为CPU核心数加2。线程活动流程包括任务生产(thd1)和消费,通过Stop函数可暂停或终止线程池。

       std::call_once用于确保特定代码只执行一次,确保并发线程中数据可见性的一致性。另外,l 源码使用std::thread::hardware_concurrency获取硬件并发数,有助于优化线程池配置。

线程池newCachedThreadPool

       新线程池newCachedThreadPool的源码揭示了其独特设计和功能。它的核心特点在于动态创建和重用线程,以提高执行短暂异步任务的程序性能。此池允许在先前构造的线程可用时重复使用它们,且最大线程数为Integer.MAX_VALUE,意味着资源使用相对灵活。

       在newCachedThreadPool中,线程的存活时间设置为秒,超过此时间未使用的线程将被终止并从池中移除。这一特性有助于避免资源浪费,保持空闲时间足够长的池不会消耗任何资源。此外,新线程池不包含核心线程,其操作基于SynchronousQueue队列,确保线程间高效同步。

       使用newCachedThreadPool时,程序执行到大约秒后自动终止,因为线程池已完成所有任务。存活线程在超过秒的闲置后被终止和移除,这体现了其设计原理。

       为何newCachedThreadPool选择SynchronousQueue而不是其他线程池通常采用的LinkedBlockQueue?SynchronousQueue是一个特殊的阻塞队列,旨在实现线程间高效同步。它没有内部容量,且插入操作需等待相应的删除操作。此特性使其成为切换设计的理想选择,允许线程在需要时安全地传递信息、事件或任务,尤其适用于需要多线程间同步的应用场景。

       SynchronousQueue通过实现Collection和Iterator接口支持所有可选方法,包括支持可选的公平性策略。默认情况下,不保证生产者和使用者线程的FIFO顺序访问,但通过将公平性策略设置为true,可以确保按此顺序授予访问权限。

       总之,newCachedThreadPool通过动态线程重用和SynchronousQueue的高效同步机制,提供了一种灵活且高效的处理短暂异步任务的方法。其设计旨在优化资源使用,通过在任务完成后的秒内自动清理资源,保持系统性能高效。

java 线程池 工作队列是如何工作的

       使用线程池的好处

       1、降低资源消耗

       可以重复利用已创建的线程降低线程创建和销毁造成的消耗。

       2、提高响应速度

       当任务到达时,任务可以不需要等到线程创建就能立即执行。

       3、提高线程的可管理性

       线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控

       线程池的工作原理

       首先我们看下当一个新的任务提交到线程池之后,线程池是如何处理的

       1、线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则执行第二步。

       2、线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里进行等待。如果工作队列满了,则执行第三步

       3、线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务

       线程池饱和策略

       这里提到了线程池的饱和策略,那我们就简单介绍下有哪些饱和策略:

       AbortPolicy

       为Java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute需要try catch,否则程序会直接退出。

       DiscardPolicy

       直接抛弃,任务不执行,空方法

       DiscardOldestPolicy

       从队列里面抛弃head的一个任务,并再次execute 此task。

       CallerRunsPolicy

       在调用execute的线程里面执行此command,会阻塞入口

       用户自定义拒绝策略(最常用)

       实现RejectedExecutionHandler,并自己定义策略模式

       下我们以ThreadPoolExecutor为例展示下线程池的工作流程图

       1.jpg

       2.jpg

       1、如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

       2、如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。

       3、如果无法将任务加入BlockingQueue(队列已满),则在非corePool中创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。

       4、如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

       ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。

       线程池只是并发编程中的一小部分,下图是史上最全面的Java的并发编程学习技术总汇

       3.jpg

       关键方法源码分析

       我们看看核心方法添加到线程池方法execute的源码如下:

       //     //Executes the given task sometime in the future.  The task     //may execute in a new thread or in an existing pooled thread.     //     // If the task cannot be submitted for execution, either because this     // executor has been shutdown or because its capacity has been reached,     // the task is handled by the current { @code RejectedExecutionHandler}.     //     // @param command the task to execute     // @throws RejectedExecutionException at discretion of     //         { @code RejectedExecutionHandler}, if the task     //         cannot be accepted for execution     // @throws NullPointerException if { @code command} is null     //    public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        //         // Proceed in 3 steps:         //         // 1. If fewer than corePoolSize threads are running, try to         // start a new thread with the given command as its first         // task.  The call to addWorker atomically checks runState and         // workerCount, and so prevents false alarms that would add         // threads when it shouldn't, by returning false.         // 翻译如下:         // 判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程,         // 如果能完成新线程创建exexute方法结束,成功提交任务         // 2. If a task can be successfully queued, then we still need         // to double-check whether we should have added a thread         // (because existing ones died since last checking) or that         // the pool shut down since entry into this method. So we         // recheck state and if necessary roll back the enqueuing if         // stopped, or start a new thread if there are none.         // 翻译如下:         // 在第一步没有完成任务提交;状态为运行并且能否成功加入任务到工作队列后,再进行一次check,如果状态         // 在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要         // reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程;         // 3. If we cannot queue task, then we try to add a new         // thread.  If it fails, we know we are shut down or saturated         // and so reject the task.         // 翻译如下:         // 如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池         // 已经达到饱和状态,所以reject这个他任务         //        int c = ctl.get();        // 工作线程数小于核心线程数        if (workerCountOf(c) < corePoolSize) {            // 直接启动新线程,true表示会再次检查workerCount是否小于corePoolSize            if (addWorker(command, true))                return;            c = ctl.get();        }        // 如果工作线程数大于等于核心线程数        // 线程的的状态未RUNNING并且队列notfull        if (isRunning(c) && workQueue.offer(command)) {            // 再次检查线程的运行状态,如果不是RUNNING直接从队列中移除            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                // 移除成功,拒绝该非运行的任务                reject(command);            else if (workerCountOf(recheck) == 0)                // 防止了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。                // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务                addWorker(null, false);        }        // 如果队列满了或者是非运行的任务都拒绝执行        else if (!addWorker(command, false))            reject(command);    }

线程池的四种创建方式及区别

       1. 使用Executors.newCachedThreadPool()创建线程池。

        这种线程池会根据需要创建新线程,对于旧线程,如果秒没有被使用,就会被回收。这种线程池适用于短生命周期的异步任务。

       2. 使用Executors.newFixedThreadPool()创建定长线程池。

        这个线程池维护了一个固定大小的线程队列,如果新任务提交到线程池时,线程池已满,任务就会被保存在任务队列中,待线程空闲时再执行。

       3. 使用Executors.newSingleThreadExecutor()创建单线程池。

        这个线程池只有一个线程工作,保证所有任务按照提交顺序执行。如果线程因为异常而结束,会有一个新的线程来替代它。

       4. 使用Executors.newScheduledThreadPool()创建定时线程池。

        这个线程池不仅可以处理并发任务,还能处理定时和周期性任务。线程池会根据任务的执行时间来安排任务。

       线程池的拒绝策略:

       当线程池达到最大容量且队列也满了时,线程池会采用拒绝策略处理新提交的任务。默认的拒绝策略是直接抛出异常,但也可以自定义拒绝策略或使用其他推荐策略,如调用者运行策略、丢弃策略等。

       关于代码段:

       代码段中的ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); 是正确的创建定时线程池的方式,但之后的代码片段没有正确地实现定时任务,而是使用了scheduleAtFixedRate方法,这需要一个Runnable对象。如果要执行定时任务,应该创建一个实现了Runnable接口的类,并重写run方法。

ThreadPoolExecutor简介&源码解析

       线程池是通过池化管理线程的高效工具,尤其在多核CPU时代,利用线程池进行并行处理任务有助于提升服务器性能。ThreadPoolExecutor是线程池的具体实现,它负责线程管理和任务管理,以及处理任务拒绝策略。这个类提供了多种功能,如通过Executors工厂方法配置,执行Runnable和Callable任务,维护任务队列,统计任务完成情况等。

       创建线程池需要考虑关键参数,如核心线程数(任务开始执行时立即创建),最大线程数(任务过多时限制新线程生成),线程存活时间,任务队列大小,线程工厂以及拒绝策略。JDK提供了四种拒绝策略,如默认的AbortPolicy,当资源饱和时抛出异常。此外,线程池还提供了beforeExecute和afterExecute钩子函数,用于在任务执行前后执行自定义操作。

       当任务提交到线程池时,会经历一系列处理流程,包括任务的执行和线程池状态的管理。例如,如果任务队列和线程池满,会根据拒绝策略处理新任务。使用线程池时,需注意线程池容量与状态的计算,以及线程池工作线程的动态调整。

       示例中,自定义线程池并重写钩子函数,创建任务后向线程池提交,可以看到线程池如何根据配置动态调整资源。但要注意,如果任务过多且无法处理,可能会抛出异常。源码分析中,submit方法实际上是调用execute,而execute内部包含Worker类和runWorker方法的逻辑,包括任务的获取和执行。

       线程池的容量上限并非Integer.MAX_VALUE,而是由ctl变量的低位决定。 Doug Lea的工具函数简化了ctl的操作,使得计算线程池状态和工作线程数更加便捷。通过深入了解ThreadPoolExecutor,开发者可以更有效地利用线程池提高应用性能。