1.FLINK 部署(阿里云)、品源监控 和 源码案例
2.Flink深入浅出:JDBC Connector源码分析
3.十二、源码flink源码解析-创建和启动TaskManager二
4.详解flink中Look up维表的品源使用
5.Flink源码编译
6.Flink Collector Output 接口源码解析
FLINK 部署(阿里云)、监控 和 源码案例
FLINK部署、源码监控与源码实例详解
在实际部署FLINK至阿里云时,品源POM.xml配置是源码源码163一个关键步骤。为了减小生产环境的品源包体积并提高效率,我们通常选择将某些依赖项设置为provided,源码确保在生产环境中这些jar包已预先存在。品源而在本地开发环境中,源码这些依赖需要被包含以支持测试。品源 核心代码示例中,源码数据流API的品源运用尤其引人注目。通过Flink,源码我们实现了从Kafka到Hologres的品源高效数据流转。具体步骤如下:Kafka配置:首先,确保Kafka作为数据源的配置正确无误,包括连接参数、主题等,这是整个流程的开端。
Flink处理:Flink的数据流API在此处发挥威力,它可以实时处理Kafka中的数据,执行各种复杂的数据处理操作。
目标存储:数据处理完成后,Flink将结果无缝地发送到Hologres,作为最终的数据存储和分析目的地。
Flink深入浅出:JDBC Connector源码分析
大数据开发中,数据分析与报表制作是日常工作中最常遇到的任务。通常,我们通过读取Hive数据来进行计算,并将结果保存到数据库中,然后通过前端读取数据库来进行报表展示。然而,使用FlinkSQL可以简化这一过程,通过一个SQL语句即可完成整个ETL流程。
在Flink中,读取Hive数据并将数据写入数据库是常见的需求。本文将重点讲解数据如何写入数据库的过程,包括刷写数据库的机制和原理。
以下是本文将讲解的几个部分,以解答在使用过程中可能产生的wiki文档源码疑问:
1. 表的定义
2. 定义的表如何找到具体的实现类(如何自定义第三方sink)
3. 写入数据的机制原理
(本篇基于1..0源码整理而成)
1. 表的定义
Flink官网提供了SQL中定义表的示例,以下以oracle为例:
定义好这样的表后,就可以使用insert into student执行插入操作了。接下来,我们将探讨其中的技术细节。
2. 如何找到实现类
实际上,这一过程涉及到之前分享过的SPI(服务提供者接口),即DriverManager去寻找Driver的过程。在Flink SQL执行时,会通过translate方法将SQL语句转换为对应的Operation,例如insert into xxx中的xxx会转换为CatalogSinkModifyOperation。这个操作会获取表的信息,从而得到Table对象。如果这个Table对象是CatalogTable,则会进入TableFactoryService.find()方法找到对应的实现类。
寻找实现类的过程就是SPI的过程。即通过查找路径下所有TableFactory.class的实现类,加载到内存中。这个SPI的定义位于resources下面的META-INFO下,定义接口以及实现类。
加载到内存后,首先判断是否是TableFactory的实现类,然后检查必要的参数是否满足(如果不满足会抛出异常,很多人在第一次使用Flink SQL注册表时,都会遇到NoMatchingTableFactoryException异常,其实都是因为配置的属性不全或者Jar报不满足找不到对应的TableFactory实现类造成的)。
找到对应的实现类后,调用对应的createTableSink方法就能创建具体的实现类了。
3. 工厂模式+创建者模式,创建TableSink
JDBCTableSourceSinkFactory是JDBC表的具体实现工厂,它实现了stream的sinkfactory。在1..0版本中,它不能在batch模式下使用,但在1.版本中据说会支持。这个类使用了经典的工厂模式,其中createStreamTableSink负责创建真正的Table,基于创建者模式构建JDBCUpsertTableSink。
创建出TableSink之后,就可以使用Flink API,基于DataStream创建一个Sink,并配置对应的icp系统源码并行度。
4. 消费数据写入数据库
在消费数据的过程中,底层基于PreparedStatement进行批量提交。需要注意的是提交的时机和机制。
控制刷写触发的最大数量 'connector.write.flush.max-rows' = ''
控制定时刷写的时间 'connector.write.flush.interval' = '2s'
这两个条件先到先触发,这两个参数都是可以通过with()属性配置的。
JDBCUpsertFunction很简单,主要的工作是包装对应的Format,执行它的open和invoke方法。其中open负责开启连接,invoke方法负责消费每条数据提交。
接下来,我们来看看关键的format.open()方法:
接下来就是消费数据,执行提交了
AppendWriter很简单,只是对PreparedStatement的封装而已
5. 总结
通过研究代码,我们应该了解了以下关键问题:
1. JDBC Sink执行的机制,比如依赖哪些包?(flink-jdbc.jar,这个包提供了JDBCTableSinkFactory的实现)
2. 如何找到对应的实现?基于SPI服务发现,扫描接口实现类,通过属性过滤,最终确定对应的实现类。
3. 底层如何提交记录?目前只支持append模式,底层基于PreparedStatement的addbatch+executeBatch批量提交
4. 数据写入数据库的时机和机制?一方面定时任务定时刷新,另一方面数量超过限制也会触发刷新。
更多Flink内容参考:
十二、flink源码解析-创建和启动TaskManager二
深入探讨Flink源码中创建与启动TaskManager的过程,我们首先聚焦于内部启动onStart阶段。此阶段核心在于启动TaskExecutorServices服务,具体步骤包括与ResourceManager的连接、注册和资源分配。
当TaskExecutor启动时,首先生成新的注册并创建未完成的future,随后等待注册成功并执行注册操作。这一过程由步骤1至步骤5组成,确保注册与资源连接的无缝集成。一旦注册成功,资源管理器会发送SlotReport报告至TaskExecutor,然后分配slot。
TaskSlotTable开始分配slot,JobTable获取并提供slot至JobManager。这一流程确保资源的idea 源码阅读有效分配与任务的高效执行。与此同时,ResourceManager侧的TaskExecutor注册流程同样重要,包括连接与注册TaskExecutor。
一旦完成注册与资源分配,ResourceManager会发送SlotReport报告至JobMaster,提供slot以供调度任务。这一步骤标志着slot的分配与JobManager的准备工作就绪,为后续任务部署打下基础。
在ResourceManager侧,slot管理组件注册新的taskManager,根据规则更新slot状态、释放资源或继续执行注册。这一过程确保资源的高效管理与任务的顺利进行。
在JobMaster侧,slot的分配与管理通过slotPool进行,确保待调度任务能够得到所需资源。这一阶段标志着任务调度与执行的准备就绪。
流程的最后,回顾整个创建与启动TaskManager的过程,从资源连接与注册到slot分配与任务调度,各个环节紧密相连,确保Flink系统的高效运行与任务的顺利执行。
详解flink中Look up维表的使用
背景
在流式计算领域,维表是一种常用概念,主要用于SQL的JOIN操作,以实现对流数据的补充。比如,我们的数据源stream是订单日志,日志中仅记录了订单商品的ID,缺乏其他信息。但在数据分析时,我们需要商品名称、价格等详细信息,这时可以通过查询维表对数据进行补充。
维表通常存储在外部存储中,如MySQL、HBase、Redis等。本文以MySQL为例,介绍Flink中维表的sklearn 源码剖析使用。
LookupableTableSource
Flink提供LookupableTableSource接口,用于实现维表功能。通过特定的key列查询外部存储,获取相关信息,以补充stream数据。
LookupableTableSource有三个方法
在Flink中,实现LookupableTableSource接口的主要有四个类:JdbcTableSource、HBaseTableSource、CsvTableSource和HiveTableSource。本文以JDBC为例,讲解如何进行维表查询。
实例讲解
以下是一个示例,首先定义stream source,使用Flink 1.提供的datagen生成数据。
我们模拟生成用户数据,范围在1-之间。
datagen具体的使用方法请参考:
聊聊Flink 1.中的随机数据生成器-DataGen connector
然后创建一个MySQL维表信息:
该MySQL表中样例数据如下:
最后执行SQL查询,流表关联维表:
结果示例如下:
对于维表中存在的数据,已关联出来,对于维表中不存在的数据,显示为null。
完整代码请参考:github.com/zhangjun0x...
源码解析JdbcTableSource
以JDBC为例,看看Flink底层是如何实现的。
JdbcTableSource#isAsyncEnabled方法返回false,即不支持异步查询,因此进入JdbcTableSource#getLookupFunction方法。
最终构造一个JdbcLookupFunction对象。
JdbcLookupFunction
接下来看看JdbcLookupFunction类,它是TableFunction的子类,具体使用可参考以下文章:
Flink实战教程-自定义函数之TableFunction
TableFunction的核心是eval方法,在该方法中,主要工作是使用多个keys拼接成SQL查询数据,首先查询缓存,缓存有数据则直接返回,缓存无数据则查询数据库,并将查询结果返回并放入缓存。下次查询时,直接查询缓存。
为什么要加缓存?默认情况下不开启缓存,每次查询都会向维表发送请求,如果数据量较大,会给存储维表的系统造成压力。因此,Flink提供了LRU缓存,查询维表时,先查询缓存,缓存无数据则查询外部系统。如果某个数据查询频率较高,一直被命中,则无法获取新数据。因此,缓存需要设置超时时间,超过这个时间则强制删除该数据,查询外部系统获取新数据。
如何开启缓存?请参考JdbcLookupFunction#open方法:
即cacheMaxSize和cacheExpireMs需要同时设置,构造缓存对象cache来缓存数据。这两个参数对应的DDL属性为lookup.cache.max-rows和lookup.cache.ttl。
对于具体的缓存大小和超时时间的设置,用户需要根据自身情况自行定义,在数据准确性和系统吞吐量之间进行权衡。
Flink源码编译
1. 下载Flink稳定版1..2,可以从官方下载链接获取,将源码同步至远程机器,使用Jetbrains Gateway打开。
2. 以Jetbrains Gateway打开源码,源码目录存放于远程机器,它会自动解析为Maven项目。
3. 注意事项:在flink-runtime-web/pom.xml文件中,需将部分内容替换,具体如下:
确保先安装npm,通过命令`yum install npm`。否则编译过程中可能会出现错误。
为了编译时内存充足,需要调整Maven设置,增加JDK可用内存。在命令行中,可以在/etc/profile中配置,或在Maven配置中指定更大的内存。
编译命令如下,对于Jetbrains Gateway,需在Run Configurations中新增配置,调整执行参数以执行mvn install或mvn clean。
编译完成后,每个模块目标文件夹会生成相应的文件。
4. 接下来进行运行。首先启动JobManager,查看flink-runtime下的StandaloneSessionClusterEntrypoint类,配置文件目录需指定,如`--configDir configpath`,并配置日志参数。
主类缺失时,需在IDEA的项目结构模块中给flink-runtime添加依赖,从flink-dist/target目录下添加jar包。
修改配置文件,将允许访问的IP设置为0.0.0.0,以便外部访问。然后映射web端口,启动JobManager后可通过外部IP访问。
运行TaskManager的参数与JobManager类似,启动后自动注册到JobManager,外部访问验证成功。
源码编译与启动完成后,其他机器无需重复编译,只需在相应环境中执行预编译的可执行文件,即可实现分布式环境的Flink使用。
Flink Collector Output 接口源码解析
Flink Collector Output 接口源码解析
Flink中的Collector接口和其扩展Output接口在数据传递中起关键作用。Output接口增加了Watermark功能,是数据传输的基石。本文将深入解析collect方法及相关重要实现类,帮助理解数据传递的逻辑和场景划分。Collector和Output接口
Collector接口有2个核心方法,Output接口则增加了4个功能,WatermarkGaugeExposingOutput接口则专注于显示Watermark值。主要关注collect方法,它是数据发送的核心操作,Flink中有多个Output实现类,针对不同场景如数据传递、Metrics统计、广播和时间戳处理。Output实现类分类
Output类可以归类为:同一operatorChain内的数据传递(如ChainingOutput和CopyingChainingOutput)、跨operatorChain间(RecordWriterOutput)、统计Metrics(CountingOutput)、广播(BroadcastingOutputCollector)和时间戳处理(TimestampedCollector)。示例应用与调用链路
通过一个示例,我们了解了Kafka Source与Map算子之间的数据传递使用ChainingOutput,而Map到Process之间的传递则用RecordWriterOutput。在不同Output的选择中,objectReuse配置起着决定性作用,影响性能和安全性。 总结来说,ChainingOutput用于operatorChain内部,RecordWriterOutput处理跨chain,CountingOutput负责Metrics,BroadcastingOutputCollector用于广播,TimestampedCollector则用于设置时间戳。开启objectReuse会影响选择的Output类型。阅读推荐
Flink任务实时监控
Flink on yarn日志收集
Kafka Connector更新
自定义Kafka反序列化
SQL JSON Format源码解析
Yarn远程调试源码
State Processor API状态操作
侧流输出源码
Broadcast流状态源码解析
Flink启动流程分析
Print SQL Connector取样功能
Flink源码分析——Checkpoint源码分析(二)
《Flink Checkpoint源码分析》系列文章深入探讨了Flink的Checkpoint机制,本文聚焦于Task内部状态数据的存储过程,深入剖析状态数据的具体存储方式。Flink的Checkpoint核心逻辑被封装在`snapshotStrategy.snapshot()`方法中,这一过程主要由`HeapSnapshotStrategy`实现。在进行状态数据的快照操作时,首先对状态数据进行拷贝,这里采取的是引用拷贝而非实例拷贝,速度快且占用内存较少。拷贝后的状态数据被写入到一个临时的`CheckpointStateOutputStream`,即`$CHECKPOINT_DIR/$UID/chk-n`格式的目录,这个并非最终数据存储位置。
在拷贝和初始化输出流后,`AsyncSnapshotCallable`被创建,其`callInternal()`方法中负责将状态数据持久化至磁盘。这个过程分为几个关键步骤:
获取`CheckpointStateOutputStream`,写入状态数据元数据,如状态名、序列化类型等。
对状态数据按`keyGroupId`进行分组,依次将每个`keyGroupId`对应的状态数据写入文件。
封装状态数据的元数据信息,包括存储路径和大小,以及每个`keyGroupId`在文件中的偏移位置。
在分组过程中,状态数据首先被扁平化并添加到`partitioningSource[]`中,同时记录每个元素对应的`keyGroupId`在`counterHistogram[]`中的位置。构建直方图后,数据依据`keyGroupId`进行排序并写入文件,同时将偏移位置记录在`keyGroupOffsets[]`中。具体实现细节中,`FsCheckpointStateOutputStream`用于创建文件系统输出流,配置包括基路径、文件系统类型、缓冲大小、文件状态阈值等。`StreamStateHandle`最终封装了状态数据的存储文件路径和大小信息,而`KeyedStateHandle`进一步包含`StreamStateHandle`和`keyGroupRangeOffsets`,后者记录了每个`keyGroupId`在文件中的存储位置,以供状态数据检索使用。
简而言之,Flink在执行Checkpoint时,通过一系列精心设计的步骤,确保了状态数据的高效、安全存储。从状态数据的拷贝到元数据的写入,再到状态数据的持久化,每一个环节都充分考虑了性能和数据完整性的需求,使得Flink的实时计算能力得以充分发挥。
深度解析Flink flatMap算子的自定义方法(附代码例子)
本文深入解读了Flink中flatMap算子的自定义方法,并提供了代码实例。在使用Flink的算子时,通常需要自定义,自定义时可以采用Lambda表达式或继承并重写函数类。
对于map、flatMap、reduce等操作,开发者可以实现MapFunction、FlatMapFunction、ReduceFunction等接口类。这些函数类拥有泛型参数,定义了输入或输出数据类型。要自定义函数,需要继承这些类并重写内部函数,例如FlatMapFunction接口由Flink的Function接口继承,且具备Serializable接口,用于确保在任务管理器之间进行序列化和反序列化。
在使用FlatMapFunction时,接口定义了两个泛型参数:T和O,分别对应输入和输出数据类型。自定义函数主要关注重写flatMap方法,该方法接受输入值value和Collector类out作为参数,负责处理输入数据并输出相应的结果。
本文提供了一个继承FlatMapFunction并实现flatMap的示例,用于对长度超过特定限制的字符串进行切词处理。
当处理逻辑简单时,使用Lambda表达式可能是更优的选择。Flink的Scala源码中提供三种定义flatMap的实现方式,每种方式在Lambda表达式的输入、输出类型和使用场景上有所不同。Lambda表达式可以简化代码编写,但需要注意类型匹配,以避免Intellij IDEA的类型检查提示。
本文还介绍了另一种实现方法——使用Intellij IDEA的类型检查和匹配功能,帮助开发者在代码编写过程中快速识别并修正类型不匹配的问题。
在某些情况下,Flink提供了更高级的Rich函数类,增加了Rich前缀的函数类在普通的函数类基础上增加了额外的功能,如RuntimeContext的访问,用于在分布式环境下进行更复杂的操作,如累加器的使用。
综上所述,Flink的自定义方法提供了丰富的功能,包括Lambda表达式、普通函数类和Rich函数类等。开发者可以根据实际需求选择合适的方法进行自定义,以实现高效的数据处理任务。