1.下面哪些是flink架构的组成部分
2.flinkåå
¥hdfs
3.大数据开发_Flink_概述,部署,运行架构,流处理API,Window,WaterMark,ProcessFunction,状态编程
4.docker部署大数据平台(hadoop生态及flink)
5.Flink系列十九Flink 作业Hadoop 依赖冲突解决NoSuchMethodError
6.flink1.10.0连接apache-hive-2.3.7(Java程序)
下面哪些是flink架构的组成部分
下面哪些是flink架构的组成部分 Flink 是一个开源的分布式流处理框架,它由以下几个组成部分:Flink 运行时:负责管理 Flink 应用程序的执行,包括任务调度、资源管理、容错等。
Flink 库:提供各种功能,自制网络爬虫源码如数据流处理、批处理、图算法、机器学习等。
Flink SQL:一种用于数据仓库和流处理查询的查询语言,支持将 SQL 查询转换为 Flink 应用程序。
Flink Streaming:一种用于实时数据处理的高级流处理 API,支持事件驱动的新月优胜指标源码流式应用程序。
Flink DataSet API:一种用于批处理和流处理的数据集 API,支持迭代式和批处理式的数据处理。
Flink YARN:一种用于在 YARN 上运行 Flink 应用程序的组件,支持在 Hadoop 生态系统中进行数据分析和处理。
这些组件共同构成了 Flink 框架,使开发人员能够构建高效、可扩展的实时数据处理应用程序。flinkåå ¥hdfs
/
*** 该çç¥å¨ä»¥ä¸ä¸ç§æ åµä¸æ»å¨å¤äº In-progress ç¶æçé¨åæ件ï¼part fileï¼ï¼
*
* å®è³å°å å« åéçæ°æ®
* æè¿ 5 åé没ææ¶å°æ°çè®°å½
* æ件大å°è¾¾å° 1GB ï¼åå ¥æåä¸æ¡è®°å½åï¼
* é¨åæ件ï¼part fileï¼å¯ä»¥å¤äºä»¥ä¸ä¸ç§ç¶æä¹ä¸ï¼
*
* In-progress ï¼å½åæ件æ£å¨åå ¥ä¸
* Pending ï¼å½å¤äº In-progress ç¶æçæä»¶å ³éï¼closedï¼äºï¼å°±å为 Pending ç¶æ
* Finished ï¼å¨æåç Checkpoint åï¼Pending ç¶æå°å为 Finished ç¶æ
*/
DefaultRollingPolicy rollingPolicy = DefaultRollingPolicy
.builder()
.withMaxPartSize(**)// 设置æ¯ä¸ªæ件çæå¤§å¤§å° ,é»è®¤æ¯Mãè¿é设置为1G
.withRolloverInterval(TimeUnit.MINUTES.toMillis(5))// è³å°å å«åéçæ°æ®
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))// s空é²ï¼å°±æ»å¨åå ¥æ°çæ件 è¿ 5 åé没ææ¶å°æ°çè®°å½
.build();
/
*** è¾åºé ç½®
*/
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("kefu_client_")//åç¼
.withPartSuffix(".log")//åç¼
.build();
StreamingFileSink sink = StreamingFileSink
.forRowFormat(new Path(properties.getProperty("etl_hadoop_url")
+ properties.getProperty("etl_storm_kefu_basepath")),new SimpleStringEncoder("UTF-8"))
.withRollingPolicy(rollingPolicy)
.withBucketAssigner(new BasePathBucketAssigner())
.withBucketCheckInterval(L)// 桶æ£æ¥é´éï¼è¿é设置为1s
.withOutputFileConfig(config)
.build();
大数据开发_Flink_概述,部署,运行架构,流处理API,Window,WaterMark,ProcessFunction,状态编程
Apache Flink是一个处理框架,专为实时和离线数据流的复杂状态计算设计,旨在提供低延迟、高吞吐量、准确性和容错性的处理能力。
批处理作为其特殊类型,脚本源码代码Flink旨在通过并行处理和分布式架构来优化性能。
快速上手Flink,可选择在Standalone模式部署,通过slot(资源分配的基本单位)来分配资源,或者在生产环境中利用YARN(容器化资源管理)和Hadoop。session-cluster模式适用于小规模短时作业,而per-job-cluster模式则适合大型长期作业,甚至在Kubernetes上运行,以简化运维。
Flink的运行架构包括客户端提交任务,通过HDFS和YARN进行资源管理。任务首先由JobManager调度至TaskManager,TaskManager之间通过流式通信。源码基地杭州亲子客户端负责数据流的准备和提交,而JobManager和TaskManager作为独立的JVM进程运行。
Flink的流处理API基于数据流的链式结构,包括数据源、转换和sink。算子的并行度决定了子任务的数量。对于数据处理,Flink支持多种数据源,如Kafka、Redis、Elasticsearch和自定义JDBC sink。窗口功能将无限流分割为有限流,便于分析。黄金堆战法源码
EventTime和Watermark机制在处理乱序数据时至关重要,通过设置Watermark的延迟,Flink确保数据的准确处理和迟到数据的处理。ProcessFunction API允许开发者访问时间戳、Watermark,以及创建自定义事件驱动应用和业务逻辑。
Flink的核心容错机制是一致性检查点,通过保存任务处理状态实现故障恢复。除了检查点,用户还可以利用保存点进行备份、更新或迁移应用。状态一致性保证了流处理结果的准确性,而端到端的数据保证则确保了整个处理过程的可靠性。
docker部署大数据平台(hadoop生态及flink)
使用docker部署大数据平台(如Hadoop生态及Flink)能够显著缩短部署时间,提高数据开发效率。以下为搭建过程的概要:
主流的大数据平台架构包括数据采集(Flume或Beats)、数据存储(HDFS、Hive、ES、HBase)、实时分析(Flink)、数据查询(Presto、Clickhouse)等组件。
通过docker-compose一键部署,实现大数据平台快速搭建。组件版本如下:Apache Hadoop 3.2、Prestodb 0.、Kafka 2.0+、Hbase 2.2、Hive 3.1.2、ELK 7.9.1、Flink on yarn 1..3。
部署步骤如下:
1. 安装docker,确保系统兼容性。
2. 安装docker-compose工具。
3. 通过git clone获得docker compose文件。
4. 切换至部署目录,运行docker-compose up -d命令启动服务。
当前,各组件的dockerfile文件暂未开源,但所有组件基于Apache开源版本,开发时可安心使用。后续计划整合相关测试工具并开源。
Flink系列十九Flink 作业Hadoop 依赖冲突解决NoSuchMethodError
Flink提交作业时,可能会遇到NoSuchMethodError的异常,这通常与Hadoop依赖冲突有关。查看源码后发现,错误源于2.6.0-cdh5..1版本的FsTracer通过hadoop-common加载了TraceUtils,但实际加载的是2.7.x版本的TraceUtils。因此,问题出在版本兼容性上。有以下两种解决方案:
第一类解决方案是手动从jar包中排除冲突依赖。这需要识别冲突的库,并在Flink构建过程中排除它们,确保加载的库版本与期望一致。
第二类解决方案是通过打包工具精确排除字节码。这可以更细致地控制类加载过程,避免不兼容版本的类被加载。
深入理解这一问题,有助于我们意识到在使用Flink与外部系统集成时,版本兼容性是一个不容忽视的挑战。为避免此类问题,需要仔细管理依赖库的版本,确保它们之间无冲突。
解决此类问题的最新方法(适用于所有Flink版本)在上一篇文章中已有详细描述,参见Flink系列十八HDFS_DELEGATION_TOKEN过期的问题解决汇总。
flink1..0连接apache-hive-2.3.7(Java程序)
通过Java程序连接Apache Flink 1..0与Apache Hive 2.3.7,主要步骤如下:
第一步:添加依赖,包括以下库:
org.apache.flink flink-table-planner-blink_2. 1..0
org.apache.flink flink-connector-hive_2. 1..0
org.apache.flink flink-table-api-java-bridge_2. 1..0
org.apache.hive hive-exec ${ hive.version} provided
org.apache.hadoop hadoop-mapreduce-client-core 2.7.3
org.apache.hadoop hadoop-common 2.7.3
org.apache.hadoop hadoop-mapreduce-client-common 2.7.3
org.apache.hadoop hadoop-mapreduce-client-jobclient 2.7.3
org.apache.flink flink-shaded-hadoop-2-uber 2.7.5-8.0 provided
org.datanucleus datanucleus-api-jdo 4.2.4
org.datanucleus datanucleus-core 4.1.
org.datanucleus datanucleus-rdbms 4.1.
mysql mysql-connector-java 8.0.
org.datanucleus javax.jdo 3.2.0-m3
第二步:编写程序,加载hive-site.xml,并执行如下代码:
java
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
public class HiveConnect {
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "myhive";
String defaultDatabase = "ydj";
String hiveConfDir = "D:\\OfficeWork\\WorkSpace\\GitLab\\big-data-extension\\achilles-algorithm-platform\\src\\main\\resources";
String version = "2.3.4";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");
String createDbSql = "select * from ydj.center";
Table table = tableEnv.sqlQuery(createDbSql);
System.out.println(table);
}
}