【appletv源码直通】【功德app源码】【npoi源码使用】spark esrdd 源码

时间:2024-12-29 08:59:55 编辑:iapp源码画质 来源:盛世源码搭建

1.Spark Core读取ES的源码分区问题分析

spark esrdd 源码

Spark Core读取ES的分区问题分析

       撰写本文的初衷是因近期一位星球球友面试时,面试官询问了Spark分析ES数据时,源码生成的源码RDD分区数与哪些因素相关。

       初步推测,源码这与分片数有关,源码但具体关系是源码appletv源码直通什么呢?以下是两种可能的关系:

       1).类似于KafkaRDD的分区与kafka topic分区数的关系,一对一。源码

       2).ES支持游标查询,源码那么是源码否可以对较大的ES索引分片进行拆分,形成多个RDD分区呢?

       下面,源码我将与大家共同探讨源码,源码了解具体情况。源码

       1.Spark Core读取ES

       ES官网提供了elasticsearch-hadoop插件,源码对于ES 7.x,源码hadoop和Spark版本的源码支持如下:

       在此,我使用的ES版本为7.1.1,测试用的功德app源码Spark版本为2.3.1,没有问题。整合es和spark,导入相关依赖有两种方式:

       a,导入整个elasticsearch-hadoop包

       b,仅导入spark模块的包

       为了方便测试,我在本机启动了一个单节点的ES实例,简单的测试代码如下:

       可以看到,Spark Core读取RDD主要有两种形式的npoi源码使用API:

       a,esRDD。这种返回的是一个tuple2类型的RDD,第一个元素是id,第二个是一个map,包含ES的document元素。

       b,esJsonRDD。这种返回的netcat源码编译也是一个tuple2类型的RDD,第一个元素依然是id,第二个是json字符串。

       尽管这两种RDD的类型不同,但它们都是ScalaEsRDD类型。

       要分析Spark Core读取ES的并行度,只需分析ScalaEsRDD的getPartitions函数。

       2.源码分析

       首先,导入源码github.com/elastic/elasticsearch-hadoop这个gradle工程,源码溢出判断可以直接导入idea,然后切换到7.x版本。

       接下来,找到ScalaEsRDD,发现getPartitions方法是在其父类中实现的,方法内容如下:

       esPartitions是一个lazy型的变量:

       这种声明的原因是什么呢?

       lazy+transient的原因大家可以思考一下。

       RestService.findPartitions方法只是创建客户端获取分片等信息,然后调用,分两种情况调用两个方法:

       a).findSlicePartitions

       这个方法实际上是在5.x及以后的ES版本,同时配置了

       之后,才会执行。实际上就是将ES的分片按照指定大小进行拆分,必然要先进行分片大小统计,然后计算出拆分的分区数,最后生成分区信息。具体代码如下:

       实际上,分片就是通过游标方式,对_doc进行排序,然后按照分片计算得到的分区偏移进行数据读取,组装过程是通过SearchRequestBuilder.assemble方法实现的。

       这个实际上会浪费一定的性能,如果真的要将ES与Spark结合,建议合理设置分片数。

       b).findShardPartitions方法

       这个方法没有疑问,一个RDD分区对应于ES index的一个分片。

       3.总结

       以上就是Spark Core读取ES数据时,分片和RDD分区的对应关系分析。默认情况下,一个ES索引分片对应Spark RDD的一个分区。如果分片数过大,且ES版本在5.x及以上,可以配置参数

       进行拆分。