1.yarn源码分析(四)AppMaster启动
2.yarn源码分析(二)创建Application
3.Spark Core读取ES的源码分区问题分析
4.hadoop å®è£
å
çåºå«ãå¨çº¿ç谢谢åä½ã
yarn源码分析(四)AppMaster启动
在容器分配完成之后,启动容器的分析代码主要在ContainerImpl.java中进行。通过状态机转换,源码container从NEW状态向其他状态转移时,分析会调用RequestResourceTransition对象。源码RequestResourceTransition负责将所需的分析register源码分析资源进行本地化,或者避免资源本地化。源码若需本地化,分析还需过渡到LOCALIZING状态。源码为简化理解,分析此处仅关注是源码否进行资源本地化的情况。
为了将LAUNCH_CONTAINER事件加入事件处理队列,分析调用了sendLaunchEvent方法。源码该事件由ContainersLauncher负责处理。分析会议通讯源码ContainersLauncher的源码handle方法中,使用一个ExecutorService(线程池)容器Launcher。ContainerLaunch实现了Callable接口,其call方法生成并执行launch_container脚本。以MapReduce框架为例,该脚本在hadoop.tmp.dir/application name/container name目录下生成,其主要作用是启动MRAppMaster进程,即MapReduce的ApplicationMaster。
yarn源码分析(二)创建Application
深入剖析YARN源码中的Application创建机制,核心在于通过client向ResourceManager发起请求。这一过程中,Hadoop RPC协议作为桥梁,确保了客户端与ResourceManager间通信的源码汇3.4.2高效与可靠。客户端通过调用接口ApplicationClientProtocol来执行操作。以`yarnClient.createApplication()`与`yarnClient.submitApplication(appContext)`为例,揭示了创建Application的主要流程。
关注点集中于两个关键步骤:初始化Application及提交Application至ResourceManager。初始化通过`createApplication()`完成,此过程在`YarnClientImpl`类中实现。此方法内部调用`getNewApplication()`以获取ApplicationID,作为后续操作的基础。
获取ApplicationID是创建过程的基石,而其实现细节则深藏于`RMClientService`中。在理解这一部分时,我们需关注`RMClientService`对于长期对象的服务化处理,以及在`YarnClientImpl`中对`submitApplication`调用的ehcache源码编译具体实现。
当ApplicationID获得后,便正式步入提交阶段。通过`submitApplication()`,客户端与ResourceManager间建立联系,资源分配与应用状态监控得以实现。此过程中的关键在于`rmClient.submitApplication`方法的调用,之后通过轮询`ApplicationReport`来监控提交状态,确保应用成功部署。
深入探究`submitApplication`方法的内部逻辑,我们会发现它在`RMClientService`中调用`rmAppManager.submitApplication`,接着通过事件调度器对新建的Application进行处理。这一处理阶段主要负责保存应用信息,同时引入了YARN中的指数源码公式状态机与事件模型概念,将在后续文章中进行详尽解析。
Spark Core读取ES的分区问题分析
撰写本文的初衷是因近期一位星球球友面试时,面试官询问了Spark分析ES数据时,生成的RDD分区数与哪些因素相关。
初步推测,这与分片数有关,但具体关系是什么呢?以下是两种可能的关系:
1).类似于KafkaRDD的分区与kafka topic分区数的关系,一对一。
2).ES支持游标查询,那么是否可以对较大的ES索引分片进行拆分,形成多个RDD分区呢?
下面,我将与大家共同探讨源码,了解具体情况。
1.Spark Core读取ES
ES官网提供了elasticsearch-hadoop插件,对于ES 7.x,hadoop和Spark版本的支持如下:
在此,我使用的ES版本为7.1.1,测试用的Spark版本为2.3.1,没有问题。整合es和spark,导入相关依赖有两种方式:
a,导入整个elasticsearch-hadoop包
b,仅导入spark模块的包
为了方便测试,我在本机启动了一个单节点的ES实例,简单的测试代码如下:
可以看到,Spark Core读取RDD主要有两种形式的API:
a,esRDD。这种返回的是一个tuple2类型的RDD,第一个元素是id,第二个是一个map,包含ES的document元素。
b,esJsonRDD。这种返回的也是一个tuple2类型的RDD,第一个元素依然是id,第二个是json字符串。
尽管这两种RDD的类型不同,但它们都是ScalaEsRDD类型。
要分析Spark Core读取ES的并行度,只需分析ScalaEsRDD的getPartitions函数。
2.源码分析
首先,导入源码github.com/elastic/elasticsearch-hadoop这个gradle工程,可以直接导入idea,然后切换到7.x版本。
接下来,找到ScalaEsRDD,发现getPartitions方法是在其父类中实现的,方法内容如下:
esPartitions是一个lazy型的变量:
这种声明的原因是什么呢?
lazy+transient的原因大家可以思考一下。
RestService.findPartitions方法只是创建客户端获取分片等信息,然后调用,分两种情况调用两个方法:
a).findSlicePartitions
这个方法实际上是在5.x及以后的ES版本,同时配置了
之后,才会执行。实际上就是将ES的分片按照指定大小进行拆分,必然要先进行分片大小统计,然后计算出拆分的分区数,最后生成分区信息。具体代码如下:
实际上,分片就是通过游标方式,对_doc进行排序,然后按照分片计算得到的分区偏移进行数据读取,组装过程是通过SearchRequestBuilder.assemble方法实现的。
这个实际上会浪费一定的性能,如果真的要将ES与Spark结合,建议合理设置分片数。
b).findShardPartitions方法
这个方法没有疑问,一个RDD分区对应于ES index的一个分片。
3.总结
以上就是Spark Core读取ES数据时,分片和RDD分区的对应关系分析。默认情况下,一个ES索引分片对应Spark RDD的一个分区。如果分片数过大,且ES版本在5.x及以上,可以配置参数
进行拆分。
hadoop å®è£ å çåºå«ãå¨çº¿ç谢谢åä½ã
hadoop-2.6.0-src.tar.gzæ¯æºç å缩æ件ãå¯ä»¥ç¨eclipseå¯¼å ¥ç 究æºç ï¼æè Mavenæ建ç¼è¯æå ã
hadoop-2.6.0.tar.gzæ¯å·²ç»å®æ¹åå¸çå缩å ï¼å¯ä»¥ç´æ¥ä½¿ç¨ãä¸è¿å®ç½ä¸è½½çhadoopåå¸çæ¬åªéåxç¯å¢ï¼è¥è¦xçåéè¦Mavenéæ°æ建ã
*.mds æ¯æè¿°æ件ï¼è®°å½å缩å çMD5ï¼SHA1çä¿¡æ¯ã