1.Spark Core读取ES的源码分区问题分析
Spark Core读取ES的分区问题分析
撰写本文的初衷是因近期一位星球球友面试时,面试官询问了Spark分析ES数据时,源码生成的源码RDD分区数与哪些因素相关。
初步推测,源码这与分片数有关,源码但具体关系是源码appletv源码直通什么呢?以下是两种可能的关系:
1).类似于KafkaRDD的分区与kafka topic分区数的关系,一对一。源码
2).ES支持游标查询,源码那么是源码否可以对较大的ES索引分片进行拆分,形成多个RDD分区呢?
下面,源码我将与大家共同探讨源码,源码了解具体情况。源码
1.Spark Core读取ES
ES官网提供了elasticsearch-hadoop插件,源码对于ES 7.x,源码hadoop和Spark版本的源码支持如下:
在此,我使用的ES版本为7.1.1,测试用的功德app源码Spark版本为2.3.1,没有问题。整合es和spark,导入相关依赖有两种方式:
a,导入整个elasticsearch-hadoop包
b,仅导入spark模块的包
为了方便测试,我在本机启动了一个单节点的ES实例,简单的测试代码如下:
可以看到,Spark Core读取RDD主要有两种形式的npoi源码使用API:
a,esRDD。这种返回的是一个tuple2类型的RDD,第一个元素是id,第二个是一个map,包含ES的document元素。
b,esJsonRDD。这种返回的netcat源码编译也是一个tuple2类型的RDD,第一个元素依然是id,第二个是json字符串。
尽管这两种RDD的类型不同,但它们都是ScalaEsRDD类型。
要分析Spark Core读取ES的并行度,只需分析ScalaEsRDD的getPartitions函数。
2.源码分析
首先,导入源码github.com/elastic/elasticsearch-hadoop这个gradle工程,源码溢出判断可以直接导入idea,然后切换到7.x版本。
接下来,找到ScalaEsRDD,发现getPartitions方法是在其父类中实现的,方法内容如下:
esPartitions是一个lazy型的变量:
这种声明的原因是什么呢?
lazy+transient的原因大家可以思考一下。
RestService.findPartitions方法只是创建客户端获取分片等信息,然后调用,分两种情况调用两个方法:
a).findSlicePartitions
这个方法实际上是在5.x及以后的ES版本,同时配置了
之后,才会执行。实际上就是将ES的分片按照指定大小进行拆分,必然要先进行分片大小统计,然后计算出拆分的分区数,最后生成分区信息。具体代码如下:
实际上,分片就是通过游标方式,对_doc进行排序,然后按照分片计算得到的分区偏移进行数据读取,组装过程是通过SearchRequestBuilder.assemble方法实现的。
这个实际上会浪费一定的性能,如果真的要将ES与Spark结合,建议合理设置分片数。
b).findShardPartitions方法
这个方法没有疑问,一个RDD分区对应于ES index的一个分片。
3.总结
以上就是Spark Core读取ES数据时,分片和RDD分区的对应关系分析。默认情况下,一个ES索引分片对应Spark RDD的一个分区。如果分片数过大,且ES版本在5.x及以上,可以配置参数
进行拆分。