1.linkedqueueԴ??
2.从源码全面解析 LinkedBlockingQueue的来龙去脉
3.深入理解 RxJava2:Scheduler(2)
4.concurrentlinkedqueue的实现原理是什么?
5.java并发编程之深入学习Concurrent包(十一,ConcurrentLinkedQueue类)
6.Java并发编程笔记之LinkedBlockingQueue源码探究
linkedqueueԴ??
引言
本文将详细解读Java中常见的5种BlockingQueue阻塞队列,包括它们的优缺点、区别以及典型应用场景,以帮助深入理解这5种队列的独特性质和使用场合。
常见的des cbc c源码BlockingQueue有以下5种:
1. **基于数组实现的阻塞队列**:创建时需指定容量大小,是有限队列。
2. **基于链表实现的阻塞队列**:默认无界,可自定义容量。
3. **无缓冲阻塞队列**:生产的数据需立即被消费,无缓冲。
4. **优先级阻塞队列**:支持元素按照大小排序,无界。
5. **延迟阻塞队列**:基于PriorityQueue实现,无界。
**BlockingQueue简介
**BlockingQueue作为接口,定义了放数据和取数据的多组方法,适用于并发多线程环境,特别适合生产者-消费者模式。
**应用场景
**BlockingQueue的作用类似于消息队列,用于解耦、异步处理和削峰,适用于线程池的核心功能实现。
**区别与比较
**- **ArrayBlockingQueue**:基于数组实现,容量可自定义。
- **LinkedBlockingQueue**:基于链表实现,无界或自定义容量。
- **SynchronousQueue**:同步队列,生产者和消费者直接交互,无需缓冲。
- **PriorityBlockingQueue**:实现优先级排序,无界队列。
- **DelayQueue**:本地延迟队列,支持元素延迟执行。
在选择使用哪种队列时,需考虑具体任务的特性、吞吐量需求以及是否需要优先级排序或延迟执行。
本文旨在提供全面理解Java中BlockingQueue的指南,从源码剖析到应用场景,帮助开发者更好地应用这些工具于实际项目中。
从源码全面解析 LinkedBlockingQueue的来龙去脉
并发编程是互联网技术的核心,面试官常在此领域对求职者进行深入考察。为了帮助读者在面试中占据优势,本文将解析 LinkedBlockingQueue 的工作原理。
阻塞队列是并发编程中常见的数据结构,它在生产者和消费者模型中扮演重要角色。生产者负责向队列中添加元素,而消费者则从队列中取出元素。LinkedBlockingQueue 是 Java 中的一种高效阻塞队列实现,它底层基于链表结构。
在初始化阶段,LinkedBlockingQueue 不需要指定队列大小。除了基本成员变量,它还包含两把锁,分别用于读取和写入操作。有读者疑惑,为何需要两把锁,而其他队列只用一把?本文后续将揭晓答案。
生产者使用 `add()`、`offer()`、`offer(time)` 和 `put()` 方法向队列中添加元素。消费者则通过 `remove()`、`poll()`、`poll(time)` 和 `take()` 方法从队列中获取元素。
在解析源码时,发现 LinkedBlockingQueue 与 ArrayBlockingQueue 在锁的使用上有所不同。ArrayBlockingQueue 使用互斥锁,而 LinkedBlockingQueue 使用读锁和写锁。这是否意味着 ArrayBlockingQueue 可以使用相同类型的锁?答案是肯定的,且使用两把锁的 ArrayBlockingQueue 在性能上有所提升。
流程图展示了 LinkedBlockingQueue 和 ArrayBlockingQueue 之间的相似之处。有兴趣的读者可以自行绘制。
总结而言,LinkedBlockingQueue 是文章列表源码一种高效的阻塞队列实现,其底层结构基于链表。它通过读锁和写锁管理线程安全,为生产者和消费者提供了并发支持。通过优化锁的使用,LinkedBlockingQueue 在某些场景下展现出更好的性能。
互联网寒冬虽在,但学习和分享是抵御寒冬的最佳方式。通过交流经验,可以减少弯路,提高效率。如果你对后端架构和中间件源码感兴趣,欢迎与我交流,共同进步。
深入理解 RxJava2:Scheduler(2)
欢迎来到深入理解 RxJava2 系列第二篇,本文基于 RxJava 2.2.0 正式版源码,将探讨 Scheduler 与 Worker 的概念及其实现原理。
Scheduler 与 Worker 在 RxJava2 中扮演着至关重要的角色,它们是线程调度的核心与基石。虽然 Scheduler 的作用较为熟悉,但 Worker 的概念了解的人可能较少。为何在已有 Scheduler 的情况下,还要引入 Worker 的概念呢?让我们继续探讨。
首先,Scheduler 的核心定义是调度 Runnable,支持立即、延时和周期性调用。而 Worker 是任务的最小单元的载体。在 RxJava2 内部实现中,通常一个或多个 Worker 对应一个 ScheduledThreadPoolExecutor 对象,这里暂不深入探讨。
在 RxJava 1.x 中,Scheduler 没有 scheduleDirect/schedulePeriodicallyDirect 方法,只能先创建 Worker,再通过 Worker 来调度任务。这些方法是对 Worker 调度的简化,可以理解为创建一个只能调度一次任务的 Worker 并立即调度该任务。在 Scheduler 基类的源码中,默认实现是直接创建 Worker 并创建对应的 Task(虽然在部分 Scheduler 的覆盖实现上并没有创建 Worker,但可以认为存在虚拟的 Worker)。
一个 Scheduler 可以创建多个 Worker,这两者是一对多的关系,而 Worker 与 Task 也是一对多的关系。Worker 的存在旨在确保两件事:统一调度 Runnable 和统一取消任务。例如,在 observeOn 操作符中,可以通过 Worker 来统一调度和取消一系列的 Runnable。
RxJava2 默认内置了多种 Scheduler 实现,适用于不同场景,这些 Scheduler 都可以在 Schedulers 类中直接获得。以下是两个常用 Scheduler 的源码分析:computation 和 io。
NewThreadWorker 在 computation、io 和 newThread 中都有涉及,下面简单了解一下这个类。NewThreadWorker 与 ScheduledThreadPoolExecutor 之间是一对一的关系,在构造函数中通过工厂方法创建一个 corePoolSize 为 1 的 ScheduledThreadPoolExecutor 对象并持有。
ScheduledThreadPoolExecutor 从 JDK1.5 开始存在,这个类继承于 ThreadPoolExecutor,支持立即、延时和周期性任务。但是注意,在 ScheduledThreadPoolExecutor 中,maximumPoolSize 参数是无效的,corePoolSize 表示最大线程数,且它的队列是无界的。这里不再深入探讨该类,否则会涉及太多内容。
有了这个类,RxJava2 在实现 Worker 时就站在了巨人的肩膀上,线程调度可以直接使用该类解决,唯一的麻烦之处就是封装一层 Disposable 的逻辑。
ComputationScheduler 是计算密集型的 Scheduler,其线程数与 CPU 核心数密切相关。当线程数远超过 CPU 核心数目时,jap登录源码CPU 的时间更多地损耗在了线程的上下文切换。因此,保持最大线程数与 CPU 核心数一致是比较通用的方式。
FixedSchedulerPool 可以看作是固定数量的真正 Worker 的缓存池。确定了 MAX_THREADS 后,在 ComputationScheduler 的构造函数中会创建 FixedSchedulerPool 对象,FixedSchedulerPool 内部会直接创建一个长度为 MAX_THREADS 的 PoolWorker 数组。PoolWorker 继承自 NewThreadWorker,但没有任何额外的代码。
PoolWorker 的使用方法是从池子里取一个 PoolWorker 并返回。但是需要注意,每个 Worker 是独立的,每个 Worker 内部的任务是绑定在这个 Worker 中的。如果按照上述方法暴露 PoolWorker,会出现两个问题:
为了解决上述问题,需要在 PoolWorker 外再包一层 EventLoopWorker。EventLoopWorker 是一个代理对象,它会将 Runnable 代理给 FixedSchedulerPool 中取到的 PoolWorker 来调度,并负责管理通过它创建的任务。当自身被取消时,会将创建的任务全部取消。
与 ComputationScheduler 恰恰相反,IoScheduler 的线程数是无上限的。这是因为 IO 设备的速度远低于 CPU 速度,在等待 IO 操作时,CPU 往往是闲置的。因此,应该创建更多的线程让 CPU 尽可能地利用。当然,并不是线程越多越好,线程数目膨胀到一定程度会影响 CPU 的效率,也会消耗大量的内存。在 IoScheduler 中,每个 Worker 在空置一段时间后就会被清除以控制线程的数目。
CachedWorkerPool 是一个变长并定期清理的 ThreadWorker 的缓存池,内部通过一个 ConcurrentLinkedQueue 维护。和 PoolWorker 类似,ThreadWorker 也是继承自 NewThreadWorker。仅仅是增加了一个 expirationTime 字段,用来标识这个 ThreadWorker 的超时时间。
在 CachedWorkerPool 初始化时,会传入 Worker 的超时时间,目前是写死的 秒。这个超时时间表示 ThreadWorker 闲置后最大存活时间(实际中不保证 秒时被回收)。
IoScheduler 中也存在一个 EventLoopWorker 类,它和 ComputationScheduler 中的作用类似。因为 CachedWorkerPool 是每隔 秒清理一次队列的,所以 ThreadWorker 的存活时间取决于入队的时机。如果一直没有被再次取出,其被实际清理的延迟在 - 秒之间。
熟悉线程的读者会发现,ComputationScheduler 与 IoScheduler 很像某些参数下的 ThreadPoolExecutor。它们对线程的控制外在表现很相似,但实际的线程执行对象不一样。这两者的对比有助于我们更深刻地理解 Scheduler 设计的内在逻辑。
Scheduler 是 RxJava 线程的核心概念,RxJava 基于此屏蔽了 Thread 相关的概念,只与 Scheduler/Worker/Runnable 打交道。
本来计划继续基于 Scheduler 和大家一起探讨 subscribeOn 与 observeOn,但考虑到篇幅问题,这些留待下篇分享。
感谢大家的阅读,欢迎关注笔者的公众号,可以第一时间获取更新,同时欢迎留言沟通。
concurrentlinkedqueue的实现原理是什么?
在并发场景下,高性能的并发容器ConcurrentLinkedQueue在实现上采用了链表结构。它的设计思想旨在优化并发性能,避免了锁的使用,采用 CAS 和 volatile 关键字来保证线程安全。ConcurrentLinkedQueue 的设计包含以下几个关键点:
1. **数据结构**:它由单向链表实现,使用 volatile 标记关键字段,确保读取操作的libgd源码下载可见性。这些关键字段包括记录队首和队尾的节点。
2. **设计思想**:采用延迟更新首尾节点的思想,减少 CAS 的开销,优化并发性能。首尾节点并不总是最新的节点,这有助于减少 CAS 操作的频率。
3. **哨兵节点**:引入哨兵节点(虚拟节点)简化代码逻辑,特别是在链表操作中,它可以避免特殊处理第一个节点的情况,同时在只有一个节点时减少并发冲突。
4. **源码实现**:
- **offer**:入队操作涉及变量 t、p 和 q,用于迭代和 CAS 操作。入队分为几种情况,每种情况对应不同的操作逻辑。
- **poll**:出队操作类似,涉及变量 h、p 和 q,逻辑与入队相反,用于删除队首节点。
5. **流程图实现**:通过流程图直观地展示了入队和出队的详细步骤,包括节点的添加、更新以及哨兵节点的使用,有助于理解实际操作流程。
6. **总结**:ConcurrentLinkedQueue 的实现通过优化数据结构和操作逻辑,利用 CAS 和 volatile 关键字,提供了高效且线程安全的并发容器。其适用于数据量大、并发量高、频繁读写操作的场景,特别在队列的头部和尾部操作中表现优异。
要理解 ConcurrentLinkedQueue 的实现原理,关键是掌握并发编程基础、链表操作和 CAS 技术。通过分析其源代码和流程图,可以深入理解其高效并发性能的来源。
java并发编程之深入学习Concurrent包(十一,ConcurrentLinkedQueue类)
深入学习ConcurrentLinkedQueue类,了解它作为非阻塞队列实现,采用链表形式构建的容器。
ConcurrentLinkedQueue类遵循非阻塞算法,通过原子指令CAS(Compare and Swap)取代同步阻塞锁,以确保在并发访问下数据的一致性,并显著提升同步性能。根据Amdahl定律,最小化串行代码的粒度是提升并发性能的关键,ConcurrentLinkedQueue类的实现正是如此。它不严格保证链表头尾的一致性,而是通过CAS操作来确保新节点的插入和头尾节点的更新,实现高效并发。
在非阻塞队列的操作中,通常需要原子化执行的两个步骤被分离,即插入新节点的入队和出队操作,与头尾节点的更新并非同步。这减少了需要原子更新的值范围,仅涉及唯一的变量,从而提升了非阻塞队列操作的性能,这也是Amdahl定律的体现。
ConcurrentLinkedQueue类的源代码中,节点类Node实现了这一设计,其item和next域被声明为普通的volatile类型,并使用AtomicReferenceFieldUpdater来更新。通过这种方式,能够实现高效、并发的节点操作。
节点类型被分为有效节点(item不为null)、无效节点(item为null)和已删除节点(通过next链接到自身),其中,头节点是队列中的第一个有效节点,而尾节点是next为null的节点,注意这不一定是tail指向的节点。
队列的初始化通过创建一个head和tail共同指向,item及next都为null的源码站点初始队列来实现。
入队操作涉及将新节点插入到尾节点的后面,通过tail找到尾节点执行插入操作。如果插入不成功,会继续向后推进查找。这一过程确保了高效并发的实现。
出队操作则涉及从头节点开始,循环查找下一个节点,直到找到满足条件的节点为止。一旦找到满足条件的节点,则更新头节点,并返回该节点的item值。
当遍历过程已越过一个节点时,会寻找下一个节点。如果head的next等于head,则意味着到达了哨兵节点,此时下一节点从head重新开始查找。
综上所述,ConcurrentLinkedQueue类通过非阻塞算法和高效的设计,提供了一个高性能的并发队列实现,适用于需要高并发访问场景的应用。
Java并发编程笔记之LinkedBlockingQueue源码探究
LinkedBlockingQueue 是基于单向链表实现的一种阻塞队列,其内部包含两个节点用于存放队列的首尾,并维护了一个表示元素个数的原子变量 count。同时,它利用了两个 ReentrantLock 实例(takeLock 和 putLock)来保证元素的原子性入队与出队操作。此外,notEmpty 和 notFull 两个信号量与条件队列用于实现阻塞操作,使得生产者和消费者模型得以实现。
LinkedBlockingQueue 的实现主要依赖于其内部锁机制和信号量管理。构造函数默认容量为最大整数值,用户可自定义容量大小。offer 方法用于尝试将元素添加至队列尾部,若队列未满则成功,返回 true,反之返回 false。若元素为 null,则抛出 NullPointerException。put 方法尝试将元素添加至队列尾部,并阻塞当前线程直至队列有空位,若被中断则抛出 InterruptedException。通过使用 putLock 锁,确保了元素的原子性添加以及元素计数的原子性更新。
在实现细节上,offer 方法通过在获取 putLock 的同时检查队列是否已满,避免了不必要的元素添加。若队列未满,则执行入队操作并更新计数器,同时考虑唤醒等待队列未满的线程。此过程中,通过 notFull 信号量与条件队列协调线程间等待与唤醒。
put 方法则在获取 putLock 后立即检查队列是否满,若满则阻塞当前线程至 notFull 信号量被唤醒。在入队后,更新计数器,并考虑唤醒等待队列未满的线程,同样通过 notFull 信号量实现。
poll 方法用于从队列头部获取并移除元素,若队列为空则返回 null。此方法通过获取 takeLock 锁,保证了在检查队列是否为空和执行出队操作之间的原子性。在出队后,计数器递减,并考虑激活因调用 poll 或 take 方法而被阻塞的线程。
peek 方法类似,但不移除队列头部元素,返回 null 若队列为空。此方法也通过获取 takeLock 锁来保证操作的原子性。
take 方法用于阻塞获取队列头部元素并移除,若队列为空则阻塞当前线程直至队列不为空。此方法与 put 方法类似,通过 notEmpty 信号量与条件队列协调线程间的等待与唤醒。
remove 方法用于移除并返回指定元素,若存在则返回 true,否则返回 false。此方法通过双重加锁机制(fullyLock 和 fullyUnlock)来确保元素移除操作的原子性。
size 方法用于返回当前队列中的元素数量,通过 count.get() 直接获取,确保了操作的准确性。
综上所述,LinkedBlockingQueue 通过其独特的锁机制和信号量管理,实现了高效、线程安全的阻塞队列操作,适用于生产者-消费者模型等场景。
我怀疑这是IDEA的BUG,但是我翻遍全网没找到证据!
分享一个关于IDEA的有趣问题。最近,有朋友在使用Lombok的@Data注解时遇到了奇怪的现象,代码中一个布尔类型赋值给整型,居然没有报错。他将问题发给了我,我一开始也觉得不可思议。经过研究,我发现原因可能出在IDEA上,而并非Lombok插件本身。
为了验证我的猜想,我在本地环境中复现了问题。在源文件中,我只添加了@Data注解。经过编译,我发现Lombok自动为我们生成了无参构造函数、getter和setter方法、equals和hashCode方法等。这让我意识到@Data注解实际上是一个复合注解,包含了多个功能。
在深入研究中,我发现真正生成hashCode方法的注解应该是@EqualsAndHashCode。为了排除干扰,我将@Data注解替换为@EqualsAndHashCode。结果,生成的方法确实少了,而且我不关心这些方法。观察到hashCode方法的第一行代码是int PRIME = true;,我意识到这里可能存在问题。
通过使用反编译工具jd-gui和查看字节码,我发现hashCode方法的实现与预期不符。在jd-gui中,我看到的hashCode方法的第一个命令使用的是整型入栈指令,值为,而不是true。这个PRIME变量似乎没有被实际使用,这个问题暂且搁置。
在查看字节码时,我注意到hashCode方法的实现是通过整型入栈指令bipush生成的,值为。经过验证,我有理由怀疑IDEA在显示int PRIME = true时存在BUG。
尽管我在网络上进行了深入搜索,但并未找到与此问题相关的详细资料。我尝试了多种搜索策略,包括使用jd-gui工具进行反编译和直接查看字节码。虽然我未能找到权威证据证明这是IDEA的BUG,但基于上述发现,我确信这是IDEA的一个问题。
这个发现为我提供了丰富的素材,我感到非常兴奋。尽管没有找到直接的权威证据,但我的分析和验证过程让我确信这是一个值得记录的问题。关于这个现象背后的原因,我在网上也找到了一些线索,包括关于常量折叠的解释和Lombok源代码中的相关提交记录。
在深入探讨IDEA的BUG时,我还提到了另一个案例,即IDEA在Debug模式下对ConcurrentLinkedQueue的处理方式可能导致空指针异常。这个问题最终被确认为IDEA的特性,并提供了关闭相关配置的解决方案。
总的来说,这篇文章分享了我对这个问题的探索过程、发现的线索以及最后的分析结果。尽管没有找到绝对的证据,但基于我的研究和分析,我确信IDEA在这特定情况下存在BUG。
死磕 java集合之ConcurrentLinkedQueue源码分析
ConcurrentLinkedQueue
(1)不是阻塞队列
(2)通过CAS+自旋保证并发安全
(3)可用于多线程环境,但不能用在线程池中
简介
主要属性
两个属性:头节点与尾节点
主要内部类
典型单链表结构
主要构造方法
构造简单,实现无界单链表队列
入队
add(e)与offer(e)方法
无异常抛出,流程清晰
出队
remove()与poll()方法
逻辑清晰,不阻塞线程
总结
非阻塞队列,不适用于线程池
彩蛋
与LinkedBlockingQueue对比
线程安全与返回null特性相似
效率与锁机制差异显著
无法实现等待元素与用在线程池中的限制
PolarDB-X 源码解读(七):私有协议连接的一生(CN篇)
通过前文的介绍,大家基本了解了一条SQL在polardbx-sql中的解析和执行流程。由于polardbx-sql是无状态的计算节点,真正数据需要从存储节点传输到计算节点,这部分工作由私有协议完成。本文将详细介绍从发送请求到存储节点,接收返回数据的完整流程,重点在于私有协议连接的生命周期和关键代码解析。
概述
为了提高数据节点本地计算能力,同时减少网络数据传输量,计算节点会尽可能下推计算内容。一个逻辑表可能需要多个物理分片,因此计算节点与存储节点的请求会话数量会随着分片数增加而增加。传统MySQL协议+连接池架构已不能满足PolarDB-X的需求,因此私有协议在这一需求场景下应运而生。
如图所示,私有协议采用连接与会话分离的RPC协议设计理念,支持多个会话在同一个TCP通道中并行运行,具备流控机制、全双工响应式工作模式和高吞吐、可扩展等特性。
更多关于私有协议解决上述问题的设计详情,可以参考《PolarDB-X私有协议设计》一文。本文主要从代码层面详细描述私有协议的工作流程。
我们将从计算节点和存储节点两个角度完整解析私有协议连接的生命周期。篇幅限制,本文仅关注计算节点上私有协议的处理,存储节点部分将在后续文章中详细说明。
计算节点
计算节点作为私有协议的客户端,负责发送下推请求,并接收返回的数据。
网络层框架
PolarDB-X私有协议网络层采用定制化Reactor框架实现,基于Java的NIO,改进自polardbx-sql中的Reactor框架。网络层初始化时,设置CPU核心数的2倍(上限为)作为NIOProcessor,每个Reactor使用独立的堆外内存池作为收发包缓冲,总缓冲内存大小限制为堆内存大小的%。
NIO接收的包直接调用注册的处理函数,发送数据仅写入send buf,网络写入由单独线程完成。线程优先写入TCP send buf,当无法写入时,注册OP_WRITE事件等待可写后再写入剩余内容。
数据包的编码和解码在NIOClient中实现。为实现最佳性能,解包流程直接在堆外内存上进行,使用protobuf对流直接解析,将结果放入堆内。堆外内存被切分为KB chunk,每个Reactor独占一个chunk,连续解析和复用,最大化接收、解析效率。对于特大包,额外构造堆内大buffer接收和解析,回退标志在定时任务中重置,连续s无超大包时释放堆内内存,恢复高性能堆外KB buffer接收。
请求发送集成在NIOClient中,writer优先尝试写入发送缓冲队列尾部的buffer,不足时新申请buffer填充并追加到队尾。buffer来自预分配的堆外缓冲池,超过chunk大小时分配堆内buf进行序列化。
同时,NIOClient负责TCP连接的建立和断开资源释放,作为独立的底层网络资源管理实现。
连接及会话
网络层之后,我们聚焦连接与会话分离的具体实现。通过剥离连接及收发包的具体实现,连接和会话的管理变得更加清晰简洁。
首先,一个TCP连接的逻辑抽象结构在XClient中实现,为避免误解,取名为client与JDBC中的Connection区别。该类管理TCP连接和并行运行的会话,负责TCP完整生命周期的管理、认证鉴权,并维护公共信息。其中,workingSessionMap记录了连接上并行运行的所有会话映射关系,可快速通过会话ID找到对应的会话抽象结构XSession。
XSession提供了所有会话相关的请求函数和信息存储,包括执行计划请求、SQL查询请求、SQL更新请求、TSO请求、会话变量处理、数据包处理及异步唤醒等。
连接池及全局单例管理器
为了提高性能,TCP连接和会话的复用必不可少。由于连接和会话的解绑,连接池不仅缓存了到计算节点的TCP连接,也缓存了到计算节点的会话。
XClientPool管理到一个存储节点的连接池,通过IP,端口,用户名三元组唯一确定目标存储节点,同时存储该节点的全部TCP连接(XClient)和建立的会话(XSession)。
XClientPool实现存储节点会话获取,对应JDBC接口中的getConnection,同时实现连接和会话生命周期管理、连接探活、会话预分配等功能。实现单个存储节点连接池后,XConnectionManager维护目标存储节点三元组到实例连接池的映射,管理定时任务线程池,实现定时探活、会话&连接最长生命控制以及连接池预热等功能。
JDBC兼容层
新的SQL协议层对上层使用者要求较高,为了提高开发效率,私有协议提供兼容JDBC的使用方法,实现从JDBC平滑切换至私有协议,并支持协议热切换。
JDBC兼容层代码目录在compatible目录下,Connection继承在XConnection文件中。提供包括DataSource、Connection、Statement、PreparedStatement、ResultSet、ResultSetMetaData在内的大部分常用接口函数实现,不支持的函数会明确抛出异常避免误用。
整体关系
至此,私有协议计算节点端的大部分结构已说明完成。给出一个整体的关系图。
私有协议连接的一生(CN视角)
了解了私有协议各层实现后,我们以发到存储节点的请求为例,完整梳理执行流程。绕开计算节点复杂流程,直接运行代码示例(注:需将com.alibaba.polardbx.rpc.XConfig#GALAXY_X_PROTOCOL设置为true)。
直接运行playground看到预期的select 1的结果。接下来,我们深入跟踪说明。
数据源初始化
要使用私有协议,需要初始化对应存储节点的XDataSource。构造过程中,XDataSource会到XConnectionManager注册新的实例连接池,已存在的连接池引用计数加一。
获取Connection
当需要执行查询时,首先获取会话。无论是显式开启事务还是使用auto commit事务,会话都是执行请求的最小上下文。通过XDataSource的getConnection方法获取到对应存储节点的会话。XDataSource根据存储的IP,端口,用户名三元组查找到XConnectionManager中的连接池,在最高并发检查后,会话获取逻辑在XClientPool实现。首先尝试在空闲会话池中拿会话,通过重置检查和初始化后返回给调用者。大部分场景下,ConcurrentLinkedQueue提供较好的并发性能。
在代码场景下,数据源刚新建,后台定时任务未运行,流程进入连接创建流程。会有一把大锁锁住连接池,在TCP连接未达上限且没有超时的情况下,快速新建一个XClient占坑。若超限,则进入busy waiting循环。真正的TCP connect(waitChannel)在锁外被调用,首先client以阻塞模式带超时方式connect,然后切换为非阻塞模式,round robin策略注册到NIOProcesser上,返回时,TCP连接已建立。
为了兼顾安全和性能,连接鉴权在TCP建连后只用做一次,会话创建不需要鉴权。鉴权在initClient中完成,发送SESS_AUTHENTICATE_START_VALUE包,后续校验由回调完成。认证采用标准的MySQL认证流程,server端返回challenge值,库名、用户名和加盐hash后的密码返回给MySQL即可完成认证。
至此,到存储节点的TCP连接已建立,创建会话是一个异步流程。在创建新XClient时,XConnection已new好,通过下断点跟进去可看到newXSession流程,分配session id,设置状态为init,将XSession绑定到XConnection上。
最后,XConnection经过初始化(重置auto commit状态)、重置默认DB、默认字符集(lazy操作)和统计信息记录,返回给用户使用。
发送查询请求
拿到初始化好的兼容JDBC的Connection,为了简化流程,直接调用XConnection中的execQuery。XConnection的execQuery包装了XSession的execQuery,执行前执行了设置流式模式。
首先记录调用信息进行统计,进入关键的initForRequest流程。XSession初始化流程lazy,仅分配session id,设置状态为Init,真正创建session时发送SESS_NEW给server,绑定新session和session id。如果session已复用,则状态为Ready。
执行字符集更改的lazy操作,session可能在其他请求中切换字符集,根据目标字符集和当前字符集对比,决定是否发送额外的字符集更改请求。
经过一系列变量设置、lazy DB设置和protobuf包构造,请求发送到存储节点执行。发送后,同步生成XResult负责结果解析,同时XResult按照请求顺序依次拉链表,确保结果与请求一一对应。
请求流水线结构如下图所示,处理完成前序请求后,才能解析后续结果。
接收结果集
请求已发送到存储节点执行,拿到XResult,通过XResult收集查询结果集。XResult与发送请求一一对应,存储节点处理也是在会话上排队进行,不会影响流水线上其他请求的返回,保证流水线正常工作。
首先,查看结果集处理的状态机,主要状态包括获取元数据、获取数据行、获取额外信息等,顺序固定,根据请求类型,部分环节可能被省略。报错处理贯穿整个状态机,任何报错信息都会导致状态机进入错误处理环节。
对于非流式数据读取,请求结束时主动调用finishBlockMode将所有数据读出并缓存到rows中。对于流式执行的情况,结果集状态机消费数据包队列由XResult的next函数推动,内部函数internalFetchOneObject递归调用前序XResult,消费前序请求结果,从数据包队列中消费并推动状态机流转。
对于查询,首先收到RESULTSET_COLUMN_META_DATA包,表示返回数据列定义,一个包表示一列。元数据包后,收到包含数据行的RESULTSET_ROW包,一个包对应一行。数据行传输完成后,server端发送RESULTSET_FETCH_DONE标示数据发送完成。请求结束前,NOTICE包用于告知客户端rows affected等信息。最后,SQL_STMT_EXECUTE_OK包标示请求结束。
至此,完整请求处理完成,控制台应显示查询结果。
总结
本文详细描述了私有协议连接流程中的关键点和关键数据结构,相信通过本文描述,大家掌握了私有协议连接流程的基本点,在调试和修改使用中能够更加得心应手。虽然本文篇幅较长,但实际使用中涉及更多高级特性的使用,如多请求流水线、流控、执行计划传输、chunk结果集传输等。通过本文,我们对私有协议连接流程有了深入理解,为在实际场景中应用提供坚实基础。