1.Flink深入浅出:JDBC Connector源码分析
2.数据库:JDBC详解
3.<url>jdbc:oracle:thin:@localhost:1521:orcl</url>
4.JDBC连接参数设置对Oracle数据库的源码影响分析
5.有趣的 Oracle JDBC 驱动包命名问题 - ojdbc6 和 ojdbc14 哪个新?!
Flink深入浅出:JDBC Connector源码分析
大数据开发中,数据分析与报表制作是分析日常工作中最常遇到的任务。通常,源码我们通过读取Hive数据来进行计算,分析并将结果保存到数据库中,源码然后通过前端读取数据库来进行报表展示。分析libjson源码然而,源码使用FlinkSQL可以简化这一过程,分析通过一个SQL语句即可完成整个ETL流程。源码
在Flink中,分析读取Hive数据并将数据写入数据库是源码常见的需求。本文将重点讲解数据如何写入数据库的分析过程,包括刷写数据库的源码机制和原理。
以下是分析本文将讲解的几个部分,以解答在使用过程中可能产生的源码疑问:
1. 表的定义
2. 定义的表如何找到具体的实现类(如何自定义第三方sink)
3. 写入数据的机制原理
(本篇基于1..0源码整理而成)
1. 表的定义
Flink官网提供了SQL中定义表的示例,以下以oracle为例:
定义好这样的表后,就可以使用insert into student执行插入操作了。接下来,我们将探讨其中的技术细节。
2. 如何找到实现类
实际上,这一过程涉及到之前分享过的SPI(服务提供者接口),即DriverManager去寻找Driver的过程。在Flink SQL执行时,会通过translate方法将SQL语句转换为对应的Operation,例如insert into xxx中的xxx会转换为CatalogSinkModifyOperation。这个操作会获取表的信息,从而得到Table对象。如果这个Table对象是羊源码CatalogTable,则会进入TableFactoryService.find()方法找到对应的实现类。
寻找实现类的过程就是SPI的过程。即通过查找路径下所有TableFactory.class的实现类,加载到内存中。这个SPI的定义位于resources下面的META-INFO下,定义接口以及实现类。
加载到内存后,首先判断是否是TableFactory的实现类,然后检查必要的参数是否满足(如果不满足会抛出异常,很多人在第一次使用Flink SQL注册表时,都会遇到NoMatchingTableFactoryException异常,其实都是因为配置的属性不全或者Jar报不满足找不到对应的TableFactory实现类造成的)。
找到对应的实现类后,调用对应的createTableSink方法就能创建具体的实现类了。
3. 工厂模式+创建者模式,创建TableSink
JDBCTableSourceSinkFactory是JDBC表的具体实现工厂,它实现了stream的sinkfactory。在1..0版本中,它不能在batch模式下使用,但在1.版本中据说会支持。这个类使用了经典的工厂模式,其中createStreamTableSink负责创建真正的Table,基于创建者模式构建JDBCUpsertTableSink。
创建出TableSink之后,就可以使用Flink API,基于DataStream创建一个Sink,并配置对应的任务网站源码并行度。
4. 消费数据写入数据库
在消费数据的过程中,底层基于PreparedStatement进行批量提交。需要注意的是提交的时机和机制。
控制刷写触发的最大数量 'connector.write.flush.max-rows' = ''
控制定时刷写的时间 'connector.write.flush.interval' = '2s'
这两个条件先到先触发,这两个参数都是可以通过with()属性配置的。
JDBCUpsertFunction很简单,主要的工作是包装对应的Format,执行它的open和invoke方法。其中open负责开启连接,invoke方法负责消费每条数据提交。
接下来,我们来看看关键的format.open()方法:
接下来就是消费数据,执行提交了
AppendWriter很简单,只是对PreparedStatement的封装而已
5. 总结
通过研究代码,我们应该了解了以下关键问题:
1. JDBC Sink执行的机制,比如依赖哪些包?(flink-jdbc.jar,这个包提供了JDBCTableSinkFactory的实现)
2. 如何找到对应的实现?基于SPI服务发现,扫描接口实现类,通过属性过滤,最终确定对应的实现类。
3. 底层如何提交记录?目前只支持append模式,底层基于PreparedStatement的addbatch+executeBatch批量提交
4. 数据写入数据库的时机和机制?一方面定时任务定时刷新,另一方面数量超过限制也会触发刷新。
更多Flink内容参考:
数据库:JDBC详解
1.什么是JDBC
JDBC(Java Data Base Connectivity)是一种Java API,用于执行SQL语句,允许对不同关系数据库进行统一访问。它由一组Java编写的网上的源码类和接口构成,为数据库开发人员提供了编写数据库应用程序的基础。
2.数据库驱动
安装数据库后,应用程序不能直接使用数据库,需要通过相应的数据库驱动程序。这些驱动程序是数据库厂商的JDBC接口实现,是包含对Connection等接口实现类的jar文件。
二、常用接口
1.Driver接口
Driver接口由数据库厂商提供,Java开发人员只需使用此接口。要连接数据库,需先加载特定厂商的数据库驱动程序,不同数据库有不同的加载方法。例如:
装载MySql驱动:Class.forName("com.mysql.jdbc.Driver");
装载Oracle驱动:Class.forName("oracle.jdbc.driver.OracleDriver");
2.Connection接口
Connection接口表示与特定数据库的连接(会话),在连接上下文中执行SQL语句并返回结果。DriverManager.getConnection(url, user, password)方法基于JDBC URL中的数据库Connection连接。
连接MySql数据库:Connection conn = DriverManager.getConnection("jdbc:mysql://host:port/database", "user", "password");
连接Oracle数据库:Connection conn = DriverManager.getConnection("jdbc:oracle:thin:@host:port:database", "user", "password");
连接SqlServer数据库:Connection conn = DriverManager.getConnection("jdbc:microsoft:sqlserver://host:port; DatabaseName=database", "user", "password");
常用方法:
3.Statement接口
用于执行静态SQL语句并返回结果对象。
三种Statement类:
常用Statement方法:
4.ResultSet接口
ResultSet提供检索不同类型字段的方法,常用方法包括:
ResultSet还提供了对结果集进行滚动的方法:
使用后依次关闭对象及连接:ResultSet → Statement → Connection
三、使用JDBC的步骤
加载JDBC驱动程序 → 建立数据库连接Connection → 创建执行SQL的语句Statement → 处理执行结果ResultSet → 释放资源
1.注册驱动 (只做一次)
方式一:Class.forName("com.MySQL.jdbc.Driver"); 推荐这种方式,不会对具体的驱动类产生依赖。
方式二:DriverManager.registerDriver(com.mysql.jdbc.Driver); 这种方式会造成DriverManager中产生两个一样的驱动,并会对具体的驱动类产生依赖。
2.建立连接
URL用于标识数据库的位置,通过URL地址告诉JDBC程序连接哪个数据库,URL的写法为:
其他参数如:useUnicode=true&characterEncoding=utf8
3.创建执行SQL语句的statement
4.处理执行结果(ResultSet)
5.释放资源
四、事务(ACID特点、隔离级别、源码网站开发提交commit、回滚rollback)
1.批处理Batch
<url>jdbc:oracle:thin:@localhost::orcl</url>
你好:我是新手,这个问题我也刚遇到,大神们请不要喷我装,只是想和菜鸟们一起进步,谢谢。得出几个结论一并告诉你,不要嫌我啰嗦。
<url>jdbc:oracle:thin:@localhost::orcl</url>
1,@后面也可能不是localhost,需要看你的oracle架设在哪个服务器上,这里填写的是oracle所在服务器的IP地址,你如果是装在本机上可以填写localhost或者.0.,如果装在其他机器上就填那台机器的IP地址,比如是局域网里的另一台机器..1.。
2,是端口号。
3,orcl指的是你所用oracle的全局数据库名,就是你装oracle时要你填写的数据库名称。
另外,使用oracle jdbc的时候添加jar包时弄清楚一点,oracle的jdbc jar包与oracle本身自带的JDK版本是一致的,如下对应关系:
classes.jar - for Java 1.2 and 1.3
ojdbc.jar - for Java 1.4 and 1.5
ojdbc5.jar - for Java 1.5
ojdbc6.jar - for Java 1.6
我的开发环境是jdk1.6,MyEclipse6.0,oracle,当时用的是ojdbc6.jar,死活报错说是找不到oracledriver。。。后来终于弄清楚了,用了class.jar就可以了。你如果也遇到这个问题,可以先看看自己的oracle版本,查一下这个版本自带的jdk版本,然后再找对应的jar包,这个jar包可以在oracle的jdbc文件夹的lib文件夹里找到。
希望,以上几点对你有帮助。
JDBC连接参数设置对Oracle数据库的影响分析
毛思平,年IT工作经验,7年数据库管理维护经验,现就职于中国农业银行软开中心,从事数据库应用研究。
近期某环境下系统,出现大面积页面访问缓慢情况,每个页面交易响应时间2-5秒,严重超过平日访问阈值。
经排查分析,问题主要出现在数据库,生成AWR得到C的数据库DBtime每小时采样达到。进一步分析发现主要等待事件library cache lock,library cache:mutex X。
进一步分析发现DB主要耗时在SQL解析及解析失败上。
生成事件,发现数据库日志中有大量解析失败错误,且解析失败的SQL全部带有ROWID列。
将该类SQL反馈研发部门,反馈未在SQL中添加ROWID。那么ROWID是从何而来的呢?查询相关资料最后定位到JDBC连接参数设置不合理。
那么JDBC连接有哪些参数,各个参数作用是什么,参数设置会对SQL解析产生什么样的影响呢?带着这些问题,本文对JDBC参数设置对SQL解析影响做了针对性实验。
重点介绍参数:ResultSetType。ResultSetType的可选值有三个,每个值及对应的作用如下:
测试环境情况如下表:在测试数据库中建表psname,psdept,psobj并插入数据,表结构如下:
在Oracle中每次程序调用前清空数据库缓存(alter system flush shared_pool清空缓存便于观察SQL执行情况),程序调用结束后查询V$SQL视图,查看解析执行的SQL。
参数设置对解析影响如下:
ROWID是Oracle用于查询表中一行记录的地址,一旦查询中出现了SUM,DISTINCT等聚合或去重函数,获取的数据与单行记录地址不再一一对应,因此多表连接、单表查询带聚合函数、数据去重等SQL解析过程不会出现ROWID,如果出现ROWID则会报解析错误。
测试方法:
测试一,单表简单SQL1:select ID,NAME from psname。测试结果如下表,执行过程后台未见解析报错。测试过程中当ResultSetType为TYPE_SCROLL_SENSITIVE时数据库执行会自动加入rowid列。其中C给rowid自动增加一个别名,G则不会。
测试二,单表带聚合函数或去重SQL语句。测试结果显示,设置ResultSetType为TYPE_FORWARD_ONLY,TYPE_SCROLL_INSENSITIVE编译不报错,解析执行正常。设置为TYPE_SCROLL_SENSITIVE时,解析会报错,数据库通过内部纠错机制得到正确SQL再执行。
测试三,表关联查询SQL3。测试结果显示,设置ResultSetType为TYPE_FORWARD_ONLY,TYPE_SCROLL_INSENSITIVE时SQL解析都不会增加ROWID,此两种情况下SQL执行前后完全一致,数据库日志无解析报错输出。
测试四,带条件全列查询SQL4。测试结果显示,设置ResultSetType为TYPE_FORWARD_ONLY,TYPE_SCROLL_INSENSITIVE时SQL解析都不会增加ROWID,此两种情况下SQL执行前后完全一致,数据库日志无解析报错输出。
当ResultSetType等于TYPE_SCROLL_SENSITIVE时,所有SQL编译过程均会在查询结果集前增加ROWID列。当增加ROWID产生语法解析错误时,数据库通过内部纠错算法得到正确SQL再执行。当JDBC参数ResultSetType等于TYPE_FORWARD_ONLY,TYPE_SCROLL_INSENSITIVE时SQL解析都不会增加ROWID,此两种情况下SQL执行前后完全一致,数据库日志无解析报错输出。
有趣的 Oracle JDBC 驱动包命名问题 - ojdbc6 和 ojdbc 哪个新?!
在探索中,我发现了Oracle JDBC驱动包的命名问题,特别是ojdbc6与ojdbc版本之间的混淆。初看起来,这两个版本的命名似乎指向了不同的更新周期,这确实引起了我的注意。
问题现象主要体现在使用sqoop采集oracle数据时的作业失败上。深入分析后,我发现问题源于驱动包的选取。具体来说,问题出在使用了ojdbc版本的驱动包,而非预期的ojdbc6版本。
问题的原因在于,开发团队在配置过程中误选了ojdbc版本的驱动包,而非应使用的ojdbc6版本。这导致sqoop无法正确识别和连接oracle数据库,从而引发了作业失败的现象。
为了解决这一问题,我们采取了相对简单的解决方案。首先,我们从各个节点目录的$SQOOP_HOME/lib中移除了ojdbc.jar文件。随后,上传了正确的ojdbc6.jar文件,问题随之得到了解决。
进一步盘点Oracle JDBC驱动的常见版本及其对应的JDK版本,会发现Oracle对驱动包的更新策略较为明确。通常,新版本的驱动包在功能和性能上会有改进,同时也与更高级的JDK版本兼容。然而,具体到ojdbc6与ojdbc版本,两者之间的命名差异并未明确反映出版本更新的顺序,这可能导致开发者在选择驱动包时产生混淆。
因此,明确Oracle JDBC驱动包的命名逻辑以及它们与JDK版本的对应关系,对于确保数据集成作业的顺利进行至关重要。通过了解这些细节,开发团队可以避免类似的配置错误,确保sqoop等工具能够正确识别和使用oracle数据库的连接驱动,从而避免作业失败的问题。