1.池池-网络技术名称
2.java线程池(一):java线程池基本使用及Executors
3.图解+源码讲解 Nacos 客户端动态监听配置机制
池池-网络技术名称
池(Pool)的概念在服务器端软件开发中被广泛应用,旨在显著提升应用程序的运行速度、优化效率并降低系统资源消耗。在面对大量并发客户访问时,使用池结构能有效避免因频繁创建和销毁服务对象而导致的系统资源开销,从而提高整体性能。gbdt源码编译池可以被视为一个容器,存储和复用各种所需对象,减少资源创建和销毁的开销。池的设计通常分为两种类型:一种用于处理客户提交的任务,通常称为线程池(Thread Pool);另一种供客户获取并使用对象,称为资源池(Resource Pool)。 资源池的基本思想是预先建立并维护可重用的资源集合,如Socket连接、JDBC连接、CORBA对象等。通过资源池,服务器在响应多个客户请求时,能够提供对共享资源的高效管理。当客户请求特定类型的资源时,服务器从池中获取已实例化的资源对象提供服务,而非为每个客户创建新的资源,从而避免了资源的频繁创建和销毁,显著提高了资源利用效率。 为了实现资源池,首先需要定义一个`ConnManger`对象,它负责创建、管理和控制多个`Pool`对象,以及为客户提供资源访问方法。成交量的三条线金叉的源码`Pool`对象使用特定数据结构存储指定类型的资源对象,并定义了一个抽象接口`IpoolItem`,所有需要重用的资源对象都必须实现该接口。通过这三个基本类及其方法,可实现简单的资源池。为增强管理能力,可进一步为`ConnManger`和`Pool`对象添加错误恢复机制,如`repairConnection()`和`fixConnection()`方法,以处理资源异常,确保异常资源得到修复,不影响其他客户访问。动态管理功能则通过后台运行的监控线程实现,该线程根据系统运行状态动态调整池的大小,以应对请求高峰时提供更多资源,以及在请求低谷时回收资源,实现资源的高效利用。 线程池(Thread Pool)专门针对处理来自客户端的高密度、短时间的任务请求。通过复用已存在的线程对象,线程池降低了线程创建和销毁的开销,提高了任务响应速度,从而整体提升了系统服务性能。线程池设计中,任务被抽象为接口`Itask`,并通过`ThreadPool`对象管理和创建可重用的`TaskRunnable`线程对象。`TaskRunnable`执行由`ThreadPool`传递的任务,而`ThreadPool`通过`runTask`方法接收客户提交的任务,并选择合适的39d八位二进制源码等于线程对象执行。 对于线程池,同样需考虑线程逻辑与应用逻辑的分离,避免将应用逻辑固化在实现中,以确保线程池的灵活性和适用性。在`TaskRunnable`对象中实现错误恢复机制,如`notifyThreadCrash`方法,以监控线程运行状况并确保所有任务正常执行。动态管理功能通过后台监控线程实现,根据系统状态动态调整线程池大小,以应对请求高峰和低谷,实现资源的高效分配和管理。扩展资料
池,汉字,意义有水塘、湖、护城河、旧时剧场中正厅前部。另有池姓。java线程池(一):java线程池基本使用及Executors
@[toc] 在前面学习线程组的时候就提到过线程池。实际上线程组在我们的日常工作中已经不太会用到,但是线程池恰恰相反,是我们日常工作中必不可少的工具之一。现在开始对线程池的使用,以及底层ThreadPoolExecutor的源码进行分析。1.为什么需要线程池我们在前面对线程基础以及线程的生命周期有过详细介绍。一个基本的常识就是,线程是一个特殊的对象,其底层是源码编辑器跑酷游戏第二季依赖于JVM的native方法,在jvm虚拟机内部实现的。线程与普通对象不一样的地方在于,除了需要在堆上分配对象之外,还需要给每个线程分配一个线程栈、以及本地方法栈、程序计数器等线程的私有空间。线程的初始化工作相对于线程执行的大多数任务而言,都是一个耗时比较长的工作。这与数据库使用一样。有时候我们连接数据库,仅仅只是为了执行一条很小的sql语句。但是在我们日常的开发工作中,我们的绝大部分工作内容,都会分解为一个个短小的执行任务来执行。这样才能更加合理的复用资源。这种思想就与我们之前提到的协程一样。任务要尽可能的小。但是在java中,任务不可能像协程那样拆分得那么细。那么试想,如果说,有一个已经初始化好的很多线程,在随时待命,那么当我们有任务提交的时候,这些线程就可以立即工作,无缝接管我们的任务请求。那么效率就会大大增加。这些个线程可以处理任何任务。通达信一线天选股公式源码这样一来我们就把实际的任务与线程本身进行了解耦。从而将这些线程实现了复用。 这种复用的一次创建,可以重复使用的池化的线程对象就被成为线程池。 在线程池中,我们的线程是可以复用的,不用每次都创建一个新的线程。减少了创建和销毁线程的时间开销。 同时,线程池还具有队列缓冲策略,拒绝机制和动态线程管理。可以实现线程环境的隔离。当一个线程有问题的时候,也不会对其他的线程造成影响。 以上就是我们使用线程池的原因。一句话来概括就是资源复用,降低开销。
2.java中线程池的实现在java中,线程池的主要接口是Executor和ExecutorService在这两个接口中分别对线程池的行为进行了约束,最主要的是在ExecutorService。之后,线程池的实际实现类是AbstractExecutorService类。这个类有三个主要的实现类,ThreadpoolExecutorService、ForkJoinPool以及DelegatedExecutorService。
后面我们将对这三种最主要的实现类的源码以及实现机制进行分析。
3.创建线程的工厂方法Executors在java中, 已经给我们提供了创建线程池的工厂方法类Executors。通过这个类以静态方法的模式可以为我们创建大多数线程池。Executors提供了5种创建线程池的方式,我们先来看看这个类提供的工厂方法。
3.1 newFixedThreadPool/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue.At any point, at most * { @code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks.The threads in the pool will exist * until it is explicitly { @link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if { @code nThreads <= 0} */public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}这个方法能够创建一个固定线程数量的无界队列的线程池。参数nthreads是最多可同时处理的活动的线程数。如果在所有线程都在处理任务的情况下,提交了其他的任务,那么这些任务将处于等待队列中。直到有一个线程可用为止。如果任何线程在关闭之前的执行过程中,由于失败而终止,则需要在执行后续任务的时候,创建一个新的线程来替换。线程池中的所有线程都将一直存在,直到显示的调用了shutdown方法。 上述方法能创建一个固定线程数量的线程池。内部默认的是使用LinkedBlockingQueue。但是需要注意的是,这个LinkedBlockingQueue底层是链表结构,其允许的最大队列长度为Integer.MAX_VALUE。
public LinkedBlockingQueue() { this(Integer.MAX_VALUE);}这样在使用的过程中如果我们没有很好的控制,那么就可能导致内存溢出,出现OOM异常。因此这种方式实际上已经不被提倡。我们在使用的过程中应该谨慎使用。 newFixedThreadPool(int nThreads, ThreadFactory threadFactory)方法:
/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue, using the provided * ThreadFactory to create new threads when needed.At any point, * at most { @code nThreads} threads will be active processing * tasks.If additional tasks are submitted when all threads are * active, they will wait in the queue until a thread is * available.If any thread terminates due to a failure during * execution prior to shutdown, a new one will take its place if * needed to execute subsequent tasks.The threads in the pool will * exist until it is explicitly { @link ExecutorService#shutdown * shutdown}. * * @param nThreads the number of threads in the pool * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null * @throws IllegalArgumentException if { @code nThreads <= 0} */public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);}这个方法与3.1中newFixedThreadPool(int nThreads)的方法的唯一区别就是,增加了threadFactory参数。在前面方法中,对于线程的创建是采用的默认实现Executors.defaultThreadFactory()。而在此方法中,可以根据需要自行定制。
3.2 newSingleThreadExecutor/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.)Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * { @code newFixedThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created single-threaded Executor */public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}此方法将会创建指有一个线程和一个无届队列的线程池。需要注意的是,如果这个执行线程在执行过程中由于失败而终止,那么需要在执行后续任务的时候,用一个新的线程来替换。 那么这样一来,上述线程池就能确保任务的顺序性,并且在任何时间都不会有多个线程处于活动状态。与newFixedThreadPool(1)不同的是,使用newSingleThreadExecutor返回的ExecutorService不能被重新分配线程数量。而使用newFixExecutor(1)返回的ExecutorService,其活动的线程的数量可以重新分配。后面专门对这个问题进行详细分析。 newSingleThreadExecutor(ThreadFactory threadFactory) 方法:
/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue, and uses the provided ThreadFactory to * create a new thread when needed. Unlike the otherwise * equivalent { @code newFixedThreadPool(1, threadFactory)} the * returned executor is guaranteed not to be reconfigurable to use * additional threads. * * @param threadFactory the factory to use when creating new * threads * * @return the newly created single-threaded Executor * @throws NullPointerException if threadFactory is null */public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));}这个方法与3.3中newSingleThreadExecutor的区别就在于增加了一个threadFactory。可以自定义创建线程的方法。
3.3 newCachedThreadPool/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available.These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to { @code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using { @link ThreadPoolExecutor} constructors. * * @return the newly created thread pool */public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}这个方法用来创建一个线程池,该线程池可以根据需要自动增加线程。以前的线程也可以复用。这个线程池通常可以提高很多执行周期短的异步任务的性能。对于execute将重用以前的构造线程。如果没有可用的线程,就创建一个 新的线程添加到pool中。秒内,如果该线程没有被使用,则该线程将会终止,并从缓存中删除。因此,在足够长的时间内,这个线程池不会消耗任何资源。可以使用ThreadPoolExecutor构造函数创建具有类似属性但是详细信息不同的线程池。 ?需要注意的是,这个方法创建的线程池,虽然队列的长度可控,但是线程的数量的范围是Integer.MAX_VALUE。这样的话,如果使用不当,同样存在OOM的风险。比如说,我们使用的每个任务的耗时比较长,任务的请求又非常快,那么这样势必会造成在单位时间内创建了大量的线程。从而造成内存溢出。 newCachedThreadPool(ThreadFactory threadFactory)方法:
/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available, and uses the provided * ThreadFactory to create new threads when needed. * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null */public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);}这个方法区别同样也是在于,增加了threadFactory可以自行指定线程的创建方式。
2.4 newScheduledThreadPool/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @return a newly created scheduled thread pool * @throws IllegalArgumentException if { @code corePoolSize < 0} */public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}创建一个线程池,该线程池可以将任务在指定的延迟时间之后运行。或者定期运行。这个方法返回的是ScheduledThreadPoolExecutor。这个类是ThreadPoolExecutor的子类。在原有线程池的的基础之上,增加了延迟和定时功能。我们在后面分析了ThreadPoolExecutor源码之后,再来分析这个类的源码。 与之类似的方法:
/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @param threadFactory the factory to use when the executor * creates a new thread * @return a newly created scheduled thread pool * @throws IllegalArgumentException if { @code corePoolSize < 0} * @throws NullPointerException if threadFactory is null */public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);}通过这个方法,我们可以指定threadFactory。自定义线程创建的方式。 同样,我们还可以只指定一个线程:
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));}public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory));}上述两个方法都可以实现这个功能,但是需要注意的是,这两个方法的返回在外层包裹了一个包装类。
3.5 newWorkStealingPool这种方式是在jdk1.8之后新增的。我们先来看看其源码:
public LinkedBlockingQueue() { this(Integer.MAX_VALUE);}0这个方法实际上返回的是ForkJoinPool。该方法创建了一
图解+源码讲解 Nacos 客户端动态监听配置机制
图解+源码讲解 Nacos 客户端动态监听配置机制
在人生中第一要紧的是发现自己。为了这个目的,各位时常需要孤独和深思 —— 南森 Nacos 源码分析系列相关文章
从零开始看 Nacos 源码环境搭建
图解+源码讲解 Nacos 客户端发起注册流程
图解+源码讲解 Nacos 服务端处理注册请求逻辑
图解+源码讲解 Nacos 客户端下线流程
图解+源码讲解 Nacos 服务端处理下线请求
图解+源码讲解 Nacos 客户端发起心跳请求
图解+源码讲解 Nacos 服务端处理心跳请求
图解+源码讲解 Nacos 服务端处理配置获取请求
图解+源码讲解 Nacos 客户端动态监听配置机制
NacosConfigAutoConfiguration我们看到这里面其实注入了一个 Nacos 配置刷新的关键 NacosContextRefresherBean
@Configuration@ConditionalOnProperty(name?=?"spring.cloud.nacos.config.enabled",?matchIfMissing?=?true)public?class?NacosConfigAutoConfiguration?{ //?Nacos?配置属性@Beanpublic?NacosConfigProperties?nacosConfigProperties(ApplicationContext?context)?{ if?(context.getParent()?!=?null&&?BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context.getParent(),?NacosConfigProperties.class).length?>?0)?{ return?BeanFactoryUtils.beanOfTypeIncludingAncestors(context.getParent(),NacosConfigProperties.class);}return?new?NacosConfigProperties();}//?Nacos?配置刷新属性@Beanpublic?NacosRefreshProperties?nacosRefreshProperties()?{ return?new?NacosRefreshProperties();}//?Nacos?刷新历史@Beanpublic?NacosRefreshHistory?nacosRefreshHistory()?{ return?new?NacosRefreshHistory();}//?Nacos?配置管理@Beanpublic?NacosConfigManager?nacosConfigManager(NacosConfigProperties?nacosConfigProperties)?{ return?new?NacosConfigManager(nacosConfigProperties);}//?Nacos?配置刷新@Beanpublic?NacosContextRefresher?nacosContextRefresher(NacosConfigManager?nacosConfigManager,NacosRefreshHistory?nacosRefreshHistory)?{ return?new?NacosContextRefresher(nacosConfigManager,?nacosRefreshHistory);}}NacosContextRefresher 配置中心刷新public?NacosContextRefresher(NacosConfigManager?nacosConfigManager,NacosRefreshHistory?refreshHistory)?{ //?获取配置属性信息this.nacosConfigProperties?=?nacosConfigManager.getNacosConfigProperties();//?刷新历史this.nacosRefreshHistory?=?refreshHistory;//?获取配置服务this.configService?=?nacosConfigManager.getConfigService();//?是否开启刷新,是truethis.isRefreshEnabled?=?this.nacosConfigProperties.isRefreshEnabled();}获取配置服务 getConfigServicenacosConfigManager.getConfigService(),这行代码其实就是为了创建 NcaosConfigService 对象,我们看看你是怎么创建的,其实核心代码就是通过 NacosFactory 反射创建的 NcaosConfigService 对象,这个对象是一个核心对象后续会讲到的
public?static?ConfigService?createConfigService(Properties?properties)?throws?NacosException?{ try?{ //?加载?NacosConfigService?类Class<?>?driverImplClass?=?Class.forName("com.alibaba.nacos.client.config.NacosConfigService");//?获取构造器Constructor?constructor?=?driverImplClass.getConstructor(Properties.class);//?创建实例ConfigService?vendorImpl?=?(ConfigService)?constructor.newInstance(properties);return?vendorImpl;}?catch?(Throwable?e)?{ throw?new?NacosException(NacosException.CLIENT_INVALID_PARAM,?e);}}监听器NacosContextRefresher 实现了 ApplicationListener ,一看这就是一个监听器了,我们看看这个在监听器里面做了什么操作
@Overridepublic?void?onApplicationEvent(ApplicationReadyEvent?event)?{ //?这是一个?CAS?操作,只设置一次if?(this.ready.compareAndSet(false,?true))?{ //?注册?Nacos?监听器对于应用this.registerNacosListenersForApplications();}}注册 Nacos 监听/**register Nacos Listeners. 注册Nacos监听器 */ private void registerNacosListenersForApplications() { // 默认是 true if (isRefreshEnabled()) { // 遍历Nacos属性资源中心 for (NacosPropertySource propertySource : NacosPropertySourceRepository .getAll()) { if (!propertySource.isRefreshable()) { continue; } // 获取资源ID ?String dataId = propertySource.getDataId(); // 通过组和 dataId 注册 Nacos 监听器 registerNacosListener(propertySource.getGroup(), dataId); } } }
private void registerNacosListener(final String groupKey, final String dataKey) { // 构建 Key 信息 String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey); // 在 listenerMap中放入了 key 对应 AbstractSharedListener 响应的方法 Listener listener = listenerMap.computeIfAbsent(key, lst -> new AbstractSharedListener() { @Override public void innerReceive(String dataId, String group, String configInfo) { // 刷新次数 refreshCountIncrement(); // 记录刷新历史,就是改变历史 nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo); // 发布刷新事件 applicationContext.publishEvent( new RefreshEvent(this, null, "Refresh Nacos config")); } }); // 向配置服务中添加监听器 configService.addListener(dataKey, groupKey, listener);
}
####?向配置服务中添加监听器 此时调用的是?NacosConfigService?中的?addListener?方法,但是最终执行的还是?ClientWorker?中的?addTenantListeners?方法,后面会进行分析?ClientWorker?这个类的```java@Overridepublic?void?addListener(String?dataId,?String?group,?Listener?listener)?throws?NacosException?{ //?这个?ClientWorker?worker?也是一个核心类worker.addTenantListeners(dataId,?group,?Arrays.asList(listener));}构建 CacheData 信息此时向 ClientWorker 中的 CacheData 中添加数据,之后遍历监听器添加到 CacheData 中
创建 CacheData 对象public?CacheData(ConfigFilterChainManager?configFilterChainManager,?String?name,?String?dataId,?String?group,String?tenant)?{ //?dataId?不能为空if?(null?==?dataId?||?null?==?group)?{ throw?new?IllegalArgumentException("dataId="?+?dataId?+?",?group="?+?group);}this.name?=?name;this.configFilterChainManager?=?configFilterChainManager;this.dataId?=?dataId;?//?设置dataIdthis.group?=?group;?//?设置组信息this.tenant?=?tenant;?//?设置租户listeners?=?new?CopyOnWriteArrayList<ManagerListenerWrap>();?//?装饰器集合this.isInitializing?=?true;//?加载缓存数据从本地磁盘this.content?=?loadCacheContentFromDiskLocal(name,?dataId,?group,?tenant);//?计算本地缓存信息的MD5this.md5?=?getMd5String(content);}向 CacheData 中添加数据public?void?addTenantListeners(String?dataId,?String?group,List<?extends?Listener>?listeners)throws?NacosException?{ //?DefaultGroupgroup?=?null2defaultGroup(group);String?tenant?=?agent.getTenant();?//?是?""//?向缓存数据中添加监听器CacheData?cache?=?addCacheDataIfAbsent(dataId,?group,?tenant);for?(Listener?listener?:?listeners)?{ cache.addListener(listener);}}public?CacheData?addCacheDataIfAbsent(String?dataId,?String?group,?String?tenant)throws?NacosException?{ //?获取Key信息String?key?=?GroupKey.getKeyTenant(dataId,?group,?tenant);//?在缓存?Map?中获取缓存数据CacheData?cacheData?=?cacheMap.get(key);//?如果不为空的情况下那么就返回,如果为空那么就创建一个?CacheDataif?(cacheData?!=?null)?{ return?cacheData;}//?创建一个?CacheData?cacheData?=?new?CacheData(configFilterChainManager,?agent.getName(),dataId,?group,?tenant);//?将创建好的?cacheData?放入缓存?Map?中CacheData?lastCacheData?=?cacheMap.putIfAbsent(key,?cacheData);//?如果缓存数据为空的话那么从配置中心拉取,不过此时不为空if?(lastCacheData?==?null)?{ //fix?issue?#?if?(enableRemoteSyncConfig)?{ String[]?ct?=?getServerConfig(dataId,?group,?tenant,?L);cacheData.setContent(ct[0]);}//?计算任务IDint?taskId?=?cacheMap.size()?/?(int)?ParamUtil.getPerTaskConfigSize();//?设置任务IDcacheData.setTaskId(taskId);lastCacheData?=?cacheData;}//?缓存数据初始化完成//?reset?so?that?server?not?hang?this?checklastCacheData.setInitializing(true);LOGGER.info("[{ }]?[subscribe]?{ }",?agent.getName(),?key);MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.size());//?返回最新的缓存数据return?lastCacheData;}到这里 CacheData 对象 和 cacheMap 集合已经构建完成了,后续会用到这个数据的
NacosConfigService 分析NacosConfigService这个类在创建的时候主要做了什么事情,这这里面创建了一个 ClientWorker对象,这个对象是一个核心的类,有关于配置的一些操作都是归功于 ClientWorker类
public?NacosConfigService(Properties?properties)?throws?NacosException?{ ......this.agent?=?new?MetricsHttpAgent(new?ServerHttpAgent(properties));this.agent.start();//?核心工作类this.worker?=?new?ClientWorker(this.agent,this.configFilterChainManager,?properties);}核心配置类 ClientWorker分析一下这个类都在做什么事情,都有哪些核心方法 其实能看到里面有一个构造函数、添加缓存数据、添加监听器、检查配置中心相关方法、获取服务配置、解析数据响应、移除缓存数据、删除监听器以及 shutdown方法
构造函数看到这里其实看到了定义了两个调度线程池,一个是用于配置检测的,一个是用于执行长轮询服务的
@SuppressWarnings("PMD.ThreadPoolCreationRule")public?ClientWorker(final?HttpAgent?agent,final?ConfigFilterChainManager?configFilterChainManager,?final?Properties?properties){ this.agent?=?agent;this.configFilterChainManager?=?configFilterChainManager;//?初始化操作init(properties);//?定义一个调度线程池,只有一个线程还是守护线程this.executor?=?Executors.newScheduledThreadPool(1,?new?ThreadFactory()?{ @Overridepublic?Thread?newThread(Runnable?r)?{ Thread?t?=?new?Thread(r);t.setName("com.alibaba.nacos.client.Worker."?+?agent.getName());t.setDaemon(true);return?t;}});//?定义一个多个线程的调度线程池,线程个数和CPU?核心数有关,也是守护线程,是一个长轮询this.executorService?=?Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(),?new?ThreadFactory()?{ @Overridepublic?Thread?newThread(Runnable?r)?{ Thread?t?=?new?Thread(r);t.setName("com.alibaba.nacos.client.Worker.longPolling."?+agent.getName());t.setDaemon(true);return?t;}});//?定义一个定时的调度任务,第一次执行的时候延时1毫秒,后续毫秒调度一次this.executor.scheduleWithFixedDelay(new?Runnable()?{ @Overridepublic?void?run()?{ try?{ //?检查配置信息方法checkConfigInfo();}?catch?(Throwable?e)?{ LOGGER.error("["?+?agent.getName()?+?"]?"+?"[sub-check]?rotate?check?error",?e);}}},?1L,?L,?TimeUnit.MILLISECONDS);}检查配置服务方法这个 cacheMap 包含了一些任务信息,这里面的任务是怎么来的呢,他是在添加监听器的时候添加的,上面已经分析过了
public?NacosContextRefresher(NacosConfigManager?nacosConfigManager,NacosRefreshHistory?refreshHistory)?{ //?获取配置属性信息this.nacosConfigProperties?=?nacosConfigManager.getNacosConfigProperties();//?刷新历史this.nacosRefreshHistory?=?refreshHistory;//?获取配置服务this.configService?=?nacosConfigManager.getConfigService();//?是否开启刷新,是truethis.isRefreshEnabled?=?this.nacosConfigProperties.isRefreshEnabled();}0长轮询任务 LongPollingRunnable2025-01-16 13:46
2025-01-16 13:09
2025-01-16 13:08
2025-01-16 12:57
2025-01-16 12:47
2025-01-16 11:44