å¦ä½ç¨mavenå°java8åç代ç ç¼è¯ä¸ºjava6å¹³å°ç
ããå¨ä¸è¬çJavaåºç¨å¼åè¿ç¨ä¸ï¼å¼å人å使ç¨Javaçæ¹å¼æ¯è¾ç®åãæå¼æ¯ç¨çIDEï¼ç¼åJavaæºä»£ç ï¼åå©ç¨IDEæä¾çåè½ç´æ¥è¿è¡Java ç¨åºå°±å¯ä»¥äºãè¿ç§å¼å模å¼èåçè¿ç¨æ¯ï¼å¼å人åç¼åçæ¯Javaæºä»£ç æ件ï¼.javaï¼ï¼IDEä¼è´è´£è°ç¨Javaçç¼è¯å¨æJavaæºä»£ç ç¼è¯æå¹³å°æ å ³çåè代ç ï¼byte codeï¼ï¼ä»¥ç±»æ件çå½¢å¼ä¿åå¨ç£çä¸ï¼.classï¼ãJavaèææºï¼JVMï¼ä¼è´è´£æJavaåè代ç å 载并æ§è¡ãJavaéè¿è¿ç§æ¹å¼æ¥å®ç°å ¶âç¼åä¸æ¬¡ï¼å°å¤è¿è¡ï¼Write once, run anywhereï¼â çç®æ ãJavaç±»æ件ä¸å å«çåè代ç å¯ä»¥è¢«ä¸åå¹³å°ä¸çJVMæ使ç¨ãJavaåè代ç ä¸ä» å¯ä»¥ä»¥æ件形å¼åå¨äºç£çä¸ï¼ä¹å¯ä»¥éè¿ç½ç»æ¹å¼æ¥ä¸è½½ï¼è¿å¯ä»¥åªåå¨äºå åä¸ãJVMä¸çç±»å è½½å¨ä¼è´è´£ä»å å«åè代ç çåèæ°ç»ï¼byte[]ï¼ä¸å®ä¹åºJavaç±»ãå¨æäºæ åµä¸ï¼å¯è½ä¼éè¦å¨æççæ Javaåè代ç ï¼ææ¯å¯¹å·²æçJavaåè代ç è¿è¡ä¿®æ¹ãè¿ä¸ªæ¶åå°±éè¦ç¨å°æ¬æä¸å°è¦ä»ç»çç¸å ³ææ¯ãé¦å ä»ç»ä¸ä¸å¦ä½å¨æç¼è¯Javaæºæ件ã
ããå¨æç¼è¯Javaæºæ件
ããå¨ä¸è¬æ åµä¸ï¼å¼å人åé½æ¯å¨ç¨åºè¿è¡ä¹åå°±ç¼åå®æäºå ¨é¨çJavaæºä»£ç 并ä¸æåç¼è¯ã对æäºåºç¨æ¥è¯´ï¼Javaæºä»£ç çå 容å¨è¿è¡æ¶å»æè½ç¡®å®ãè¿ä¸ªæ¶åå°±éè¦å¨æç¼è¯æºä»£ç æ¥çæJavaåè代ç ï¼åç±JVMæ¥å è½½æ§è¡ãå ¸åçåºæ¯æ¯å¾å¤ç®æ³ç«èµçå¨çº¿è¯æµç³»ç»ï¼å¦PKU JudgeOnlineï¼ï¼å 许ç¨æ·ä¸ä¼ Java代ç ï¼ç±ç³»ç»å¨åå°ç¼è¯ãè¿è¡å¹¶è¿è¡å¤å®ãå¨å¨æç¼è¯Javaæºæ件æ¶ï¼ä½¿ç¨çåæ³æ¯ç´æ¥å¨ç¨åºä¸è°ç¨Javaç¼è¯å¨ã
ããJSR å¼å ¥äºJavaç¼è¯å¨APIãå¦æ使ç¨JDK 6çè¯ï¼å¯ä»¥éè¿æ¤APIæ¥å¨æç¼è¯Java代ç ãæ¯å¦ä¸é¢ç代ç ç¨æ¥å¨æç¼è¯æç®åçHello Worldç±»ã该Javaç±»ç代ç æ¯ä¿åå¨ä¸ä¸ªå符串ä¸çã
ãã public class CompilerTest {
ãã public static void main(String[] args) throws Exception {
ãã String source = "public class Main { public static void main(String[] args) { System.out.println(\"Hello World!\");} }";
ãã JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
ãã StandardJavaFileManager fileManager = compiler.getStandardFileManager(null, null, null);
ãã StringSourceJavaObject sourceObject = newCompilerTest.StringSourceJavaObject("Main", source);
ãã Iterable< extends JavaFileObject> fileObjects = Arrays.asList(sourceObject);
ãã CompilationTask task = compiler.getTask(null, fileManager, null,null, null, fileObjects);
ãã boolean result = task.call();
ãã if (result) {
ãã System.out.println("ç¼è¯æåã");
ãã }
ãã }
ãã
ãã static class StringSourceJavaObject extends SimpleJavaFileObject {
ãã
ãã private String content = null;
ãã public StringSourceJavaObject(String name, String content) throwsURISyntaxException {
ãã super(URI.create("string:///" + name.replace('.','/') + Kind.SOURCE.extension), Kind.SOURCE);
ãã this.content = content;
ãã }
ãã
ãã public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException {
ãã return content;
ãã }
ãã }
ãã }
ããå¦æä¸è½ä½¿ç¨JDK 6æä¾çJavaç¼è¯å¨APIçè¯ï¼å¯ä»¥ä½¿ç¨JDKä¸çå·¥å ·ç±»com.sun.tools.javac.Mainï¼ä¸è¿è¯¥å·¥å ·ç±»åªè½ç¼è¯åæ¾å¨ç£çä¸çæ件ï¼ç±»ä¼¼äºç´æ¥ä½¿ç¨javacå½ä»¤ã
ããå¦å¤ä¸ä¸ªå¯ç¨çå·¥å ·æ¯Eclipse JDT Coreæä¾çç¼è¯å¨ãè¿æ¯Eclipse Javaå¼åç¯å¢ä½¿ç¨çå¢éå¼Javaç¼è¯å¨ï¼æ¯æè¿è¡åè°è¯æé误ç代ç ã该ç¼è¯å¨ä¹å¯ä»¥åç¬ä½¿ç¨ãPlayæ¡æ¶å¨å é¨ä½¿ç¨äºJDTçç¼è¯å¨æ¥å¨æç¼è¯Javaæºä»£ç ãå¨å¼å模å¼ä¸ï¼Playæ¡æ¶ä¼å®ææ«æ项ç®ä¸çJavaæºä»£ç æ件ï¼ä¸æ¦åç°æä¿®æ¹ï¼ä¼èªå¨ç¼è¯ Javaæºä»£ç ãå æ¤å¨ä¿®æ¹ä»£ç ä¹åï¼å·æ°é¡µé¢å°±å¯ä»¥çå°ååã使ç¨è¿äºå¨æç¼è¯çæ¹å¼çæ¶åï¼éè¦ç¡®ä¿JDKä¸çtools.jarå¨åºç¨ç CLASSPATHä¸ã
ããä¸é¢ä»ç»ä¸ä¸ªä¾åï¼æ¯å ³äºå¦ä½å¨Javaéé¢åååè¿ç®ï¼æ¯å¦æ±åºæ¥(3+4)*7-çå¼ãä¸è¬çåæ³æ¯åæè¾å ¥çè¿ç®è¡¨è¾¾å¼ï¼èªå·±æ¥æ¨¡æ计ç®è¿ç¨ãèèå°æ¬å·çåå¨åè¿ç®ç¬¦çä¼å 级çé®é¢ï¼è¿æ ·ç计ç®è¿ç¨ä¼æ¯è¾å¤æï¼èä¸å®¹æåºéãå¦å¤ä¸ç§åæ³æ¯å¯ä»¥ç¨JSR å¼å ¥çèæ¬è¯è¨æ¯æï¼ç´æ¥æè¾å ¥ç表达å¼å½åJavaScriptææ¯JavaFXèæ¬æ¥æ§è¡ï¼å¾å°ç»æãä¸é¢ç代ç 使ç¨çåæ³æ¯å¨æçæJavaæºä»£ç 并ç¼è¯ï¼æ¥çå è½½Javaç±»æ¥æ§è¡å¹¶è·åç»æãè¿ç§åæ³å®å ¨ä½¿ç¨Javaæ¥å®ç°ã
ãã private static double calculate(String expr) throws CalculationException {
ãã String className = "CalculatorMain";
ãã String methodName = "calculate";
ãã String source = "public class " + className
ãã + " { public static double " + methodName + "() { return " + expr +"; } }";
ãã //çç¥å¨æç¼è¯Javaæºä»£ç çç¸å ³ä»£ç ï¼åè§ä¸ä¸è
ãã boolean result = task.call();
ãã if (result) {
ãã ClassLoader loader = Calculator.class.getClassLoader();
ãã try {
ãã Class<?> clazz = loader.loadClass(className);
ãã Method method = clazz.getMethod(methodName, new Class<?>[] { });
ãã Object value = method.invoke(null, new Object[] { });
ãã return (Double) value;
ãã } catch (Exception e) {
ãã throw new CalculationException("å é¨é误ã");
ãã }
ãã } else {
ãã throw new CalculationException("é误ç表达å¼ã");
ãã }
ãã }
ããä¸é¢ç代ç ç»åºäºä½¿ç¨å¨æçæçJavaåè代ç çåºæ¬æ¨¡å¼ï¼å³éè¿ç±»å è½½å¨æ¥å è½½åè代ç ï¼å建Javaç±»ç对象çå®ä¾ï¼åéè¿Javaåå°APIæ¥è°ç¨å¯¹è±¡ä¸çæ¹æ³ã
ããJavaåè代ç å¢å¼º
ããJava åè代ç å¢å¼ºæçæ¯å¨Javaåè代ç çæä¹åï¼å¯¹å ¶è¿è¡ä¿®æ¹ï¼å¢å¼ºå ¶åè½ãè¿ç§åæ³ç¸å½äºå¯¹åºç¨ç¨åºçäºè¿å¶æ件è¿è¡ä¿®æ¹ãå¨å¾å¤Javaæ¡æ¶ä¸é½å¯ä»¥è§å°è¿ç§å®ç°æ¹å¼ãJavaåè代ç å¢å¼ºé常ä¸Javaæºæ件ä¸ç注解ï¼annotationï¼ä¸å使ç¨ã注解å¨Javaæºä»£ç ä¸å£°æäºéè¦å¢å¼ºçè¡ä¸ºåç¸å ³çå æ°æ®ï¼ç±æ¡æ¶å¨è¿è¡æ¶å»å®æ对åè代ç çå¢å¼ºãJavaåè代ç å¢å¼ºåºç¨çåºæ¯æ¯è¾å¤ï¼ä¸è¬é½éä¸å¨åå°åä½ä»£ç å对å¼å人åå±è½åºå±çå®ç°ç»èä¸ãç¨è¿JavaBeansç人å¯è½å¯¹å ¶ä¸é£äºå¿ 须添å çgetter/setteræ¹æ³æå°å¾ç¹çï¼å¹¶ä¸é¾ä»¥ç»´æ¤ãèéè¿åè代ç å¢å¼ºï¼å¼å人ååªéè¦å£°æBeanä¸çå±æ§å³å¯ï¼getter/setteræ¹æ³å¯ä»¥éè¿ä¿®æ¹åè代ç æ¥èªå¨æ·»å ãç¨è¿JPAç人ï¼å¨è°è¯ç¨åºçæ¶åï¼ä¼åç°å®ä½ç±»ä¸è¢«æ·»å äºä¸äºé¢å¤ç ååæ¹æ³ãè¿äºååæ¹æ³æ¯å¨è¿è¡æ¶å»ç±JPAçå®ç°å¨ææ·»å çãåè代ç å¢å¼ºå¨é¢åæ¹é¢ç¼ç¨ï¼AOPï¼çä¸äºå®ç°ä¸ä¹æ使ç¨ã
Flux和Mono的常用API源码分析
Flux是一个响应式流,能够生成零个、一个、显卡 支持源码输出多个或无限个元素。Flux的产生元素机制主要体现在Flux.just和Flux.empty两个方法上。Flux.just返回的FluxArray内部存储了一个数组,用来保存1个或多个数据,通过ArraySubscription传递给消费者。Flux.empty则返回了一个FluxEmpty实例,当收到消费者注册信号时,会调用Operators的complete方法,消费者会收到一个complete信号,除此之外没有任何操作。
重复流通过创建一个FluxRepeatPredicate对象实现,这个对象在结束时会重新订阅Publisher,从而产生无限数量的流。doOnSignal方法提供了在框架中不消费数据或转变数据的机制,实际上是操作符FluxPeekFuseable,其peek onNext代码逻辑能大致理解其原理。
Mono表示要么有一个元素,要么产生完成或错误信号的Publisher。其then方法有五个重载版本,实际上创建了一个MonoIgnorePublisher,通过源码可以发现,MonoIgnorePublisher将真正的监听者封装为IgnoreElementsSubscriber,然后将事件源监听。Mono和Flux都有Create方法,用于创建对应的序列,Mono的create方法创建了MonoCreate对象,里面包含了MonoSink和一个消费者。Mono的then方法会忽略前面的onNext数据,只会传递给下游完成和错误的ssld指标公式源码信号。then(Mono other)则创建了一个ThenIgnoreMain,并在所有操作完成之后开始下一个流的消费。
Mono和Flux的Create方法创建的对象为MonoCreate和FluxCreate,其中包含了MonoSink或FluxSink和一个消费者。使用using方法可以实现try-with-resource机制,用于包装阻塞API。
在响应式编程中,我们需要处理各种异常情况,确保异常能够传播到需要接收的地方。Publisher分为冷发布者和热发布者,冷发布者在没有订阅者时不会生成数据,而热发布者不论是否有订阅者都会生成数据。冷热发布者可以相互转换,例如使用defer将热操作符转换为冷操作符,或者使用ConnectableFlux将冷操作符转换为热操作符。在多播流中,一个Publisher可以同时给多个消费者提供数据,但只会收到一次的订阅。
FluxPublish对象在publish方法中创建,传入参数包括缓存大小和被包装的队列,这表示了publish方法创建了一个FluxPublish对象。在subscribe阶段,FluxPublish内部的PublishSubscriber会添加到父容器中。在connect方法中,真正订阅数据源,随后PublishSubscriber的onSubscribe方法会执行,根据参数拉取数据,onNext方法处理接收到的数据。
本文通过解析Flux和Mono的常用API,揭示了它们在响应式编程中的应用和原理,旨在帮助读者更好地理解并运用这些流式操作符。正确处理异常、理解冷热发布者之间的转换以及掌握多播流的特性,对于构建高效、简约软件库源码灵活的数据流处理系统至关重要。
EMQX-简介、安装部署、基础功能、python代码测试
MQTT属于是物联网的通信协议,在MQTT协议中有两大角色:客户端(发布者/订阅者),服务端(Mqtt broker);针对客户端和服务端需要有遵循该协议的的具体实现,EMQ/EMQX就是MQTT Broker的一种实现。
EMQX是基于 Erlang/OTP 平台开发的 MQTT 消息服务器,是开源社区中最流行的 MQTT 消息服务器。EMQ X 是开源百万级分布式 MQTT 消息服务器(MQTT Messaging Broker),用于支持各种接入标准 MQTT协议的设备,实现从设备端到服务器端的消息传递,以及从服务器端到设备端的设备控制消息转发。从而实现物联网设备的数据采集,和对设备的操作和控制。
到目前为止,比较流行的 MQTT Broker 有几个:使用 C 语言实现的 MQTT Broker,使用 Erlang 语言开发的 MQTT Broker,使用 Node.JS 开发的 MQTT Broker,同样使用 Erlang 开发的 MQTT Broker。从支持 MQTT5.0、稳定性、扩展性、集群能力等方面考虑,EMQX 的表现应该是最好的。
与别的MQTT服务器相比EMQ X 主要有以下的特点:经过+版本的迭代,EMQ X 目前为开源社区中最流行的 MQTT 消息中间件,在各种客户严格的生产环境上经受了严苛的考验;支持丰富的物联网协议,包括 MQTT、MQTT-SN、CoAP、 LwM2M、LoRaWAN 和 WebSocket等;优化的鸿蒙2.0源码 2021架构设计,支持超大规模的设备连接。企业版单机能支持百万的 MQTT 连接;集群能支持千万级别的 MQTT 连接;易于安装和使用;灵活的扩展性,支持企业的一些定制场景;中国本地的技术支持服务,通过微信、QQ等线上渠道快速响应客户需求;基于 Apache 2.0 协议许可,完全开源。EMQ X 的代码都放在 Github 中,用户可以查看所有源代码;EMQ X 3.0 支持 MQTT 5.0 协议,是开源社区中第一个支持 5.0协议规范的消息服务器,并且完全兼容 MQTT V3.1 和 V3.1.1 协议。除了 MQTT 协议之外,EMQ X 还支持别的一些物联网协议;单机支持百万连接,集群支持千万级连接;毫秒级消息转发。EMQ X 中应用了多种技术以实现上述功能;利用 Erlang/OTP 平台的软实时、高并发和容错(电信领域久经考验的语言);全异步架构;连接、会话、路由、集群的分层设计;消息平面和控制平面的分离等;扩展模块和插件,EMQ X 提供了灵活的扩展机制,可以实现私有协议、认证鉴权、数据持久化、桥接发和管理控制台等的扩展;桥接:EMQ X 可以跟别的消息系统进行对接,比如 EMQ X Enterprise 版本中可以支持将消息转发到 Kafka、RabbitMQ 或者别的 EMQ 节点等;共享订阅:共享订阅支持通过负载均衡的方式在多个订阅者之间来分发 MQTT 消息。比如针对物联网等 数据采集场景,会有比较多的设备在发送数据,通过共享订阅的方式可以在订阅端设置多个订阅者来实现这几个订阅者之间的工作负载均衡。
典型的物联网平台包括设备硬件、数据采集、数据存储、分析、Web / 移动应用等。EMQX 位于数据采集这一层,源码管理自动发布分别与硬件和数据存储、分析进行交互,是物联网平台的核心:前端的硬件通过 MQTT 协议与位于数据采集层的 EMQX 交互,通过 EMQX 将数据采集后,通过 EMQX 提供的数据接口,将数据保存到后台的持久化平台中(各种关系型数据库和 NOSQL 数据库),或者流式数据处理框架等,上层应用通过这些数据分析后得到的结果呈现给最终用户。
EMQX 公司主要提供三个产品,可在官网首页产品导航查看每一种产品;主要体现在支持的连接数量、产品功能和商业服务等方面的区别。
完整的 MQTT V3.1/V3.1.1 及 V5.0 协议规范支持;QoS0, QoS1, QoS2 消息支持;持久会话与离线消息支持;Retained 消息支持;Last Will 消息支持;TCP/SSL 连接支持;MQTT/WebSocket/SSL 支持;HTTP 消息发布接口支持;$SYS/# 系统主题支持;客户端在线状态查询与订阅支持;客户端 ID 或 IP 地址认证支持;用户名密码认证支持;LDAP 认证;Redis、MySQL、PostgreSQL、MongoDB、HTTP 认证集成;浏览器 Cookie 认证;基于客户端 ID、IP 地址、用户名的访问控制 (ACL);多服务器节点集群 (Cluster);支持 manual、mcast、dns、etcd、k8s 等多种集群发现方式;网络分区自动愈合;消息速率限制;连接速率限制;按分区配置节点;多服务器节点桥接 (Bridge);MQTT Broker 桥接支持;Stomp 协议支持;MQTT-SN 协议支持;CoAP 协议支持;Stomp/SockJS 支持;延时 Publish ($delay/topic);Flapping 检测;黑名单支持;共享订阅 ($share/:group/topic);TLS/PSK 支持;规则引擎;空动作 (调试);消息重新发布;桥接数据到 MQTT Broker;检查 (调试);发送数据到 Web 服务。
EMQ X 目前支持的操作系统:Centos6、Centos7、OpenSUSE tumbleweed、Debian 8、Debian 9、Debian 、Ubuntu .、Ubuntu .、Ubuntu .、macOS .、macOS .、macOS .、Windows Server 。产品部署建议 Linux 服务器,不推荐 Windows 服务器。安装的方式有很多种,可供自由选择:Shell脚本安装、包管理器安装、二进制包安装、ZIP压缩包安装、Homebrew安装、Docker运行安装、Helm安装、源码编译安装。
Dashboard界面查看基本信息。
身份认证是大多数应用的重要组成部分,MQTT 协议支持用户名密码认证,启用身份认证能有效阻止非法客户端的连接。EMQ X 中的认证指的是当一个客户端连接到 EMQ X 的时候,通过服务器端的配置来控制客户端连接服务器的权限。EMQ X 的认证支持包括两个层面:MQTT 协议本身在 CONNECT 报文中指定用户名和密码,EMQ X 以插件形式支持基于 Username、ClientID、HTTP、JWT、LDAP 及各类数据库如 MongoDB、MySQL、PostgreSQL、Redis 等多种形式的认证;在传输层上,TLS 可以保证使用客户端证书的客户端到服务器的身份验证,并确保服务器向客户端验证服务器证书。也支持基于 PSK 的 TLS/DTLS 认证。
EMQ X 支持使用内置数据源(文件、内置数据库)、JWT、外部主流数据库和自定义 HTTP API 作为身份认证数据源。连接数据源、进行认证逻辑通过插件实现的,每个插件对应一种认证方式,使用前需要启用相应的插件。客户端连接时插件通过检查其 username/clientid 和 password 是否与指定数据源的信息一致来实现对客户端的身份认证。(v5.0以上默认集成)EMQ X 支持的认证方式:内置数据源、外部数据库、其他。认证结果:认证成功、认证失败、忽略认证(ignore)。
EMQ X 默认配置中启用了匿名认证,任何客户端都能接入 EMQ X。没有启用认证插件或认证插件没有显式允许/拒绝(ignore)连接请求时,EMQ X 将根据匿名认证启用情况决定是否允许客户端连接。
可以订阅多个主题。
安装 paho-mqtt:导入 Paho MQTT 客户端。
通过TCP连接:设置 broker、port、topic、client_id,连接 MQTT Broker。
通过SSL/TLS连接:设置 broker、port、topic、client_id,连接 MQTT Broker,使用 CA certificate,设置用户名密码。
订阅主题:设置 on_message 回调函数,当收到消息时执行。
取消订阅:通过以下代码取消订阅,此时应指定取消订阅的主题。
发布消息:通过以下代码发布消息,设置消息内容、主题,调用 publish 方法。
接收消息:通过以下代码指定客户端对消息事件进行监听,并在收到消息后执行回调函数,将接收到的消息及其主题打印到控制台。
断开连接:如客户端希望主动断开连接,可以通过如下代码实现。
完整代码:导入 random、time、paho.mqtt.client as mqtt_client,设置 broker、port、topic、client_id,连接 MQTT Broker,设置 on_connect 回调函数,设置 publish 回调函数,运行客户端。
通过transmittable-thread-local源码理解线程池线程本地变量传递的原理
最近几周,我投入了大量的时间和精力,完成了UCloud服务和中间件迁移至阿里云的工作,因此没有空闲时间撰写文章。不过,回忆起很早之前对ThreadLocal源码的分析,其中提到了ThreadLocal存在向预先创建的线程中传递变量的局限性。恰好,我的一位前同事,HSBC的技术大牛,提到了团队引入了transmittable-thread-local(TTL)来解决此问题。借此机会,我深入分析了TTL源码,本文将全面分析ThreadLocal和InheritableThreadLocal的局限性,并深入探讨TTL整套框架的实现。如有对线程池和ThreadLocal不熟悉的读者,建议先阅读相关前置文章,本篇文章行文较为干硬,字数接近5万字,希望读者耐心阅读。
在Java中,没有直接的API允许子线程获取父线程的实例。获取父线程实例通常需要通过静态本地方法Thread#currentThread()。同样,为了在子线程中传递共享变量,也常采用类似的方法。然而,这种方式会导致硬编码问题,限制了方法的复用性和灵活性。为了解决这一问题,线程本地变量Thread Local应运而生,其基本原理是通过线程实例访问ThreadLocal.ThreadLocalMap来实现变量的存储与传递。
ThreadLocal与InheritableThreadLocal之间的区别主要在于控制ThreadLocal.ThreadLocalMap的创建时机和线程实例中对应的属性获取方式。通过分析源码,可以清楚地看到它们之间的联系与区别。对于不熟悉概念的读者,可以尝试通过自定义实现来理解其中的原理与关系。
ThreadLocal和InheritableThreadLocal的最大局限性在于无法为预先创建的线程实例传递变量。泛线程池Executor体系、TimerTask和ForkJoinPool等通常会预先创建线程,因此无法在这些场景中使用ThreadLocal和InheritableThreadLocal来传递变量。
TTL提供了更灵活的解决方案,它通过委托机制(代理模式)实现了变量的传递。委托可以基于Micrometer统计任务执行时间并上报至Prometheus,然后通过Grafana进行监控展示。此外,TTL通过字节码增强技术(使用ASM或Javassist等工具)实现了类加载时期替换Runnable、Callable等接口的实现,从而实现了无感知的增强功能。TTL还使用了模板方法模式来实现核心逻辑。
TTL框架的核心类TransmittableThreadLocal继承自InheritableThreadLocal,通过全局静态变量holder来管理所有TransmittableThreadLocal实例。holder实际上是一个InheritableThreadLocal,用于存储所有线程本地变量的映射,实现变量的全局共享。disableIgnoreNullValueSemantics属性的设置可以影响NULL值的处理方式,影响TTL实例的行为。
发射器Transmitter是TransmittableThreadLocal的一个公有静态类,提供传输TransmittableThreadLocal实例和注册当前线程变量至其他线程的功能。通过Transmitter的静态方法,可以实现捕获、重放和复原线程本地变量的功能。
TTL通过TtlRunnable类实现了任务的封装,确保在执行任务时能够捕获和传递线程本地变量。在任务执行前后,通过capture和restore方法捕获和重放变量,实现异步执行时上下文的传递。
启用TTL的Agent模块需要通过Java启动参数添加javaagent来激活字节码增强功能。TTL通过Instrumentation回调激发ClassFileTransformer,实现目标类的字节码增强,从而在执行任务时自动完成上下文的捕捉和传递。
TTL框架提供了一种高效、灵活的方式来解决线程池中线程复用时上下文传递的问题。通过委托机制和字节码增强技术,TTL实现了无入侵地提供线程本地变量传递功能。如果您在业务代码中遇到异步执行时上下文传递的问题,TTL库是一个值得考虑的解决方案。
2024-12-28 23:23
2024-12-28 23:12
2024-12-28 22:37
2024-12-28 22:34
2024-12-28 21:47