1.Hadoop3.0å°åºï¼Sparkä¼å代Hadoopå
2.Spark-Submit 源码剖析
3.Spark源码解析2-YarnCluster模式启动
4.为ä»ä¹Sparkåå±ä¸å¦Hadoop
Hadoop3.0å°åºï¼Sparkä¼å代Hadoopå
(1)å 说äºè ä¹é´çåºå«å§ã
é¦å ï¼Hadoopä¸Spark解å³é®é¢çå±é¢ä¸åã
HadoopåApache Spark两è é½æ¯å¤§æ°æ®æ¡æ¶ï¼ä½æ¯åèªåå¨çç®çä¸å°½ç¸åãHadoopå®è´¨ä¸æ´å¤æ¯ä¸ä¸ªåå¸å¼æ°æ®åºç¡è®¾æ½: å®å°å·¨å¤§çæ°æ®éåæ´¾å°ä¸ä¸ªç±æ®é计ç®æºç»æçé群ä¸çå¤ä¸ªèç¹è¿è¡åå¨ï¼æå³çæ¨ä¸éè¦è´ä¹°åç»´æ¤æè´µçæå¡å¨ç¡¬ä»¶ã
åæ¶ï¼Hadoopè¿ä¼ç´¢å¼åè·è¸ªè¿äºæ°æ®ï¼è®©å¤§æ°æ®å¤çååææçè¾¾å°åææªæçé«åº¦ãSparkï¼åæ¯é£ä¹ä¸ä¸ªä¸é¨ç¨æ¥å¯¹é£äºåå¸å¼åå¨ç大æ°æ®è¿è¡å¤ççå·¥å ·ï¼å®å¹¶ä¸ä¼è¿è¡åå¸å¼æ°æ®çåå¨ã
å ¶æ¬¡ï¼è¿æä¸ç¹ä¹å¼å¾æ³¨æââè¿ä¸¤è çç¾é¾æ¢å¤æ¹å¼è¿¥å¼ãå 为Hadoopå°æ¯æ¬¡å¤çåçæ°æ®é½åå ¥å°ç£çä¸ï¼æä»¥å ¶å¤©çå°±è½å¾æå¼¹æ§ç对系ç»é误è¿è¡å¤çã
Sparkçæ°æ®å¯¹è±¡åå¨å¨åå¸äºæ°æ®é群ä¸çå«åå¼¹æ§åå¸å¼æ°æ®é(RDD: Resilient Distributed Dataset)ä¸ãè¿äºæ°æ®å¯¹è±¡æ¢å¯ä»¥æ¾å¨å åï¼ä¹å¯ä»¥æ¾å¨ç£çï¼æ以RDDåæ ·ä¹å¯ä»¥æä¾å®æçç¾é¾æ¢å¤åè½ã
ç±äºä¸¤è ç侧éç¹ä¸åï¼ä½¿ç¨åºæ¯ä¸åï¼å ¶å®å¹¶æ²¡ææ¿ä»£ä¹è¯´ãSparkæ´éåäºè¿ä»£è¿ç®æ¯è¾å¤çMLåDMè¿ç®ãå 为å¨Sparkéé¢ï¼æRDDçæ¦å¿µãRDDå¯ä»¥cacheå°å åä¸ï¼é£ä¹æ¯æ¬¡å¯¹RDDæ°æ®éçæä½ä¹åçç»æï¼é½å¯ä»¥åæ¾å°å åä¸ï¼ä¸ä¸ä¸ªæä½å¯ä»¥ç´æ¥ä»å åä¸è¾å ¥ï¼çå»äºMapReduce大éçç£çIOæä½ãä½æ¯ï¼æ们ä¹è¦çå°sparkçéå¶ï¼å åãæ认为Hadoopè½ç¶è´¹æ¶ï¼ä½æ¯å¨OLAPç大è§æ¨¡æ°æ®çåºç¨åºæ¯ï¼è¿æ¯å欢è¿çãç®åHadoop涵çäºä»æ°æ®æ¶éãå°åå¸å¼åå¨ï¼åå°åå¸å¼è®¡ç®çå个é¢åï¼å¨åé¢åé½æèªå·±ç¬ç¹ä¼å¿ã
(2)为ä»ä¹æè¿ä¹å¤äººä¸ç好Hadoopï¼åæ§Sparkå¢?
å¾å¤äººå¨è°å°Spark代æ¿Hadoopçæ¶åï¼å ¶å®å¾å¤§ç¨åº¦ä¸æçæ¯ä»£æ¿MapReduceã
MapReduceç缺é·å¾å¤ï¼æ大ç缺é·ä¹ä¸æ¯Map + Reduceç模åãè¿ä¸ªæ¨¡å并ä¸éåæè¿°å¤æçæ°æ®å¤çè¿ç¨ãå¾å¤å ¬å¸æåç§å¥æªçMachine Learning计ç®ç¨MR模åæè¿°ï¼ä¸æææMRæ½åï¼å¯¹ç³»ç»å·¥ç¨å¸åOpsä¹æ¯æ大ææäºãå¾å¤è®¡ç®ï¼æ¬è´¨ä¸å¹¶ä¸æ¯ä¸ä¸ªMapï¼ShuffleåReduceçç»æï¼æ¯å¦æç¼è¯ä¸ä¸ªSubQueryçSQLï¼æ¯ä¸ªQueryé½åä¸æ¬¡Group Byï¼æå¯è½éè¦Mapï¼Reduce+Reduceï¼ä¸é´ä¸å¸æææ ç¨çMap;åæè æéè¦Joinï¼è¿å¯¹MapReduceæ¥è¯´ç®ç´æ¯å©æ¢¦ï¼ä»ä¹ç»å·¦å³è¡¨å æ ç¾ï¼å°è¡¨ç¨Distributed Cacheååï¼åç§ä¸åJoinçHackï¼é½æ¯å 为MapReduceæ¬èº«æ¯ä¸ç´æ¥æ¯æJoinçï¼å ¶å®æéè¦çæ¯ï¼ä¸¤ç»ä¸åç计ç®èç¹æ«æäºæ°æ®ä¹åæç §Keyååæ°æ®å°ä¸ä¸ä¸ªé¶æ®µå计ç®ï¼å°±è¿ä¹ç®åçè§åèå·²;åæè æè¦è¡¨ç¤ºä¸ç»å¤æçæ°æ®Pipelineï¼æ°æ®å¨ä¸ä¸ªæ æ°èç¹ç»æçå¾ä¸æµå¨ï¼èå 为MapReduceçåæ¿æ¨¡åï¼æå¿ é¡»ä¸æ¬¡ä¸æ¬¡å¨ä¸ä¸ªMap/Reduceæ¥éª¤å®æä¹åä¸å¿ è¦å°ææ°æ®åå°ç£çä¸å读åºï¼æè½ç»§ç»ä¸ä¸ä¸ªèç¹ï¼å 为Map Reduce2个é¶æ®µå®æä¹åï¼å°±ç®æ¯ä¸ä¸ªç¬ç«è®¡ç®æ¥éª¤å®æï¼å¿ å®ä¼åå°ç£çä¸çå¾ ä¸ä¸ä¸ªMap Reduce计ç®ã
ä¸é¢è¿äºé®é¢ï¼ç®æ¯æ¯ä¸ªå·ç§°ä¸ä¸ä»£å¹³å°é½å°è¯è§£å³çãç°å¨å·ç§°æ¬¡ä¸ä»£å¹³å°ç°å¨åçç¸å¯¹æåæ¯çæ¯HortonworksçTezåDatabricksçSparkãä»ä»¬é½å°è¯è§£å³äºä¸é¢è¯´çé£äºé®é¢ãTezåSparké½å¯ä»¥å¾èªç±å°æè¿°ä¸ä¸ªJobéæ§è¡æµãä»ä»¬ç¸å¯¹ç°å¨çMapReduce模åæ¥è¯´ï¼æ大çæåäºå¯¹åç§å¤æå¤ççç´æ¥æ¯æï¼ä¸éè¦åç»å°½èæ±âææâMR模åçæ½åã综ä¸ï¼Sparkæ°æ®å¤çé度ç§æMapReduceå ä¸ºå ¶å¤çæ°æ®çæ¹å¼ä¸ä¸æ ·ï¼ä¼æ¯MapReduceå¿«ä¸å¾å¤ã
(3)å¯ä»¥å¤Hadoopâæ»åâå?
ç®åå¤å追æ§çSparkè¿æå¾å¤ç¼ºé·ï¼æ¯å¦ï¼
1ã稳å®æ§æ¹é¢ï¼ç±äºä»£ç è´¨éé®é¢ï¼Sparké¿æ¶é´è¿è¡ä¼ç»å¸¸åºéï¼å¨æ¶ææ¹é¢ï¼ç±äºå¤§éæ°æ®è¢«ç¼åå¨RAMä¸ï¼Javaåæ¶åå¾ç¼æ ¢çæ åµä¸¥éï¼å¯¼è´Sparkæ§è½ä¸ç¨³å®ï¼å¨å¤æåºæ¯ä¸SQLçæ§è½çè³ä¸å¦ç°æçMap/Reduceã
2ãä¸è½å¤ç大æ°æ®ï¼åç¬æºå¨å¤çæ°æ®è¿å¤§ï¼æè ç±äºæ°æ®åºç°é®é¢å¯¼è´ä¸é´ç»æè¶ è¿RAMç大å°æ¶ï¼å¸¸å¸¸åºç°RAM空é´ä¸è¶³ææ æ³å¾åºç»æãç¶èï¼Map/Reduceè¿ç®æ¡æ¶å¯ä»¥å¤ç大æ°æ®ï¼å¨è¿æ¹é¢ï¼Sparkä¸å¦Map/Reduceè¿ç®æ¡æ¶ææã
3ãä¸è½æ¯æå¤æçSQLç»è®¡;ç®åSparkæ¯æçSQLè¯æ³å®æ´ç¨åº¦è¿ä¸è½åºç¨å¨å¤ææ°æ®åæä¸ãå¨å¯ç®¡çæ§æ¹é¢ï¼SparkYARNçç»åä¸å®åï¼è¿å°±ä¸ºä½¿ç¨è¿ç¨ä¸åä¸é忧ï¼å®¹æåºç°åç§é¾é¢ã
SparkåHadoopè°å¼ºè°å¼±ï¼å¨æ¯è¾HadoopåSparkæ¹é¢è¦è®°ä½çæéè¦ä¸ç¹å°±æ¯ï¼å®ä»¬å¹¶ä¸æ¯éæ¤å³å½¼çå ³ç³»ï¼å 为å®ä»¬ä¸æ¯ç¸äºææ¥ï¼ä¹ä¸æ¯è¯´ä¸æ¹æ¯å¦ä¸æ¹çç®ææ¿ä»£è ã两è å½¼æ¤å ¼å®¹ï¼è¿ä½¿å¾è¿å¯¹ç»åæ为ä¸ç§åè½æå ¶å¼ºå¤§ç解å³æ¹æ¡ï¼éå诸å¤å¤§æ°æ®åºç¨åºåã
ä¹å°±æ¯è¯´ï¼å¤§æ°æ®è¡ä¸çèé¸ä»¬å¦æåªä¼Hadoopå°±è¦å½å¿äºï¼æ¤åºæ¶é´æ¥å¦ä¹ Sparkåå ¶ä»æ°ææ¯æ¯ç»å¯¹å¿ è¦ç;è对äºç®åæ£åå¤å°è¯å¤§æ°æ®å¹è®çæå们ï¼ä»Hadoopå¼å§ä»ç¶æ¯æ好çéæ©ãé¿è¿æ¥çæ°ææ¯æ»ä¼ä¸æåºç°ï¼ä¸ç®¡æ¯Sparkè¿æ¯Tezä¼¼ä¹é½æçæ´ç¾å¦ç大æ°æ®åæ¯ï¼ç¶è没æ人ä¼åä½ å®å ¨æå¼Hadoopã
Spark-Submit 源码剖析
直奔主题吧:
常规Spark提交任务脚本如下:
其中几个关键的参数:
再看下cluster.conf配置参数,如下:
spark-submit提交一个job到spark集群中,大致的经历三个过程:
代码总Main入口如下:
Main支持两种模式CLI:SparkSubmit;SparkClass
首先是checkArgument做参数校验
而sparksubmit则是通过buildCommand来创建
buildCommand核心是AbstractCommandBuilder类
继续往下剥洋葱AbstractCommandBuilder如下:
定义Spark命令创建的方法一个抽象类,SparkSubmitCommandBuilder刚好是实现类如下
SparkSubmit种类可以分为以上6种。SparkSubmitCommandBuilder有两个构造方法有参数和无参数:
有参数中根据参数传入拆分三种方式,然后通过OptionParser解析Args,构造参数创建对象后核心方法是游戏的源码泄露通过buildCommand,而buildCommand又是通过buildSparkSubmitCommand来生成具体提交。
buildSparkSubmitCommand会返回List的命令集合,分为两个部分去创建此List,
第一个如下加入Driver_memory参数
第二个是通过buildSparkSubmitArgs方法构建的具体参数是MASTER,DEPLOY_MODE,FILES,CLASS等等,apache源码实现这些就和我们上面截图中是对应上的。是通过OptionParser方式获取到。
那么到这里的话buildCommand就生成了一个完成sparksubmit参数的命令List
而生成命令之后执行的任务开启点在org.apache.spark.deploy.SparkSubmit.scala
继续往下剥洋葱SparkSubmit.scala代码入口如下:
SparkSubmit,kill,request都支持,后两个方法知识支持standalone和Mesos集群方式下。dosubmit作为函数入口,其中第一步是初始化LOG,然后初始化解析参数涉及到类
SparkSubmitArguments作为参数初始化类,继承SparkSubmitArgumentsParser类
其中env是测试用的,参数解析如下,parse方法继承了SparkSubmitArgumentsParser解析函数查找 args 中设置的mybatis 3.2.8 源码--选项和值并解析为 name 和 value ,如 --master yarn-client 会被解析为值为 --master 的 name 和值为 yarn-client 的 value 。
这之后调用SparkSubmitArguments#handle(MASTER, "yarn-client")进行处理。
这个函数也很简单,根据参数 opt 及 value,设置各个成员的值。接上例,parse 中调用 handle("--master", "yarn-client")后,在 handle 函数中,master 成员将被赋值为 yarn-client。
回到SparkSubmit.scala通过SparkSubmitArguments生成了args,然后调用action来匹配动作是突击风暴源码submit,kill,request_status,print_version。
直接看submit的action,doRunMain执行入口
其中prepareSubmitEnvironment初始化环境变量该方法返回一个四元 Tuple ,分别表示子进程参数、子进程 classpath 列表、系统属性 map 、子进程 main 方法。完成了提交环境的准备工作之后,接下来就将启动子进程。
runMain则是执行入口,入参则是执行参数SparkSubmitArguments
Main执行非常的简单:几个核心步骤
先是打印一串日志(可忽略),然后是openfire 源码启动创建了loader是把依赖包jar全部导入到项目中
然后是MainClass的生成,异常处理是ClassNotFoundException和NoClassDeffoundError
再者是生成Application,根据MainClass生成APP,最后调用start执行
具体执行是SparkApplication.scala,那么继续往下剥~
仔细阅读下SparkApplication还是挺深的,所以打算另外写篇继续深入研读~
Spark源码解析2-YarnCluster模式启动
YARN 模式运行机制主要体现在Yarn Cluster 模式和Yarn Client 模式上。在Yarn Cluster模式下,SparkSubmit、ApplicationMaster 和 CoarseGrainedExecutorBackend 是独立的进程,而Driver 是独立的线程;Executor 和 YarnClusterApplication 是对象。在Yarn Client模式下,SparkSubmit、ApplicationMaster 和 YarnCoarseGrainedExecutorBackend 也是独立的进程,而Executor和Driver是对象。
在源码中,SparkSubmit阶段首先执行Spark提交命令,底层执行的是开启SparkSubmit进程的命令。代码中,SparkSubmit从main()开始,根据运行模式获取后续要反射调用的类名赋给元组中的ChildMainClass。如果是Yarn Cluster模式,则为YarnClusterApplication;如果是Yarn Client模式,则为主类用户自定义的类。接下来,获取ChildMainClass后,通过反射调用main方法的过程,反射获取类然后通过构造器获取一个示例并多态为SparkApplication,再调用它的start方法。随后调用YarnClusterApplication的start方法。在YarnClient中,new一个Client对象,其中包含了yarnClient = YarnClient.createYarnClient属性,这是Yarn在SparkSubmit中的客户端,yarnClient在第行初始化和开始,即连接Yarn集群或RM。之后就可以通过这个客户端与Yarn的RM进行通信和提交应用,即调用run方法。
ApplicationMaster阶段主要涉及开启一个Driver新线程、AM向RM注册、AM向RM申请资源并处理、封装ExecutorBackend启动命令以及AM向NM通信提交命令由NM启动ExecutorBackend。在ApplicationMaster进程中,首先开启Driver线程,开始运行用户自定义代码,创建Spark程序入口SparkContext,接着创建RDD,生成job,划分阶段提交Task等操作。
在申请资源之前,AM主线程创建了Driver的终端引用,作为参数传入createAllocator(),因为Executor启动后需要向Driver反向注册,所以启动过程必须封装Driver的EndpointRef。AM主线程向RM申请获取可用资源Container,并处理这些资源。ExecutorBackend阶段尚未完成,后续内容待补充。
为ä»ä¹Sparkåå±ä¸å¦Hadoop
Sparkæ¯ä¸ä¸ªåºäºRAM计ç®çå¼æºç ComputerClusterè¿ç®ç³»ç»ï¼ç®çæ¯æ´å¿«éå°è¿è¡æ°æ®åæãSparkæ©æçæ ¸å¿é¨å代ç åªæ3ä¸è¡ãSparkæä¾äºä¸HadoopMap/Reduceç¸ä¼¼çåæ£å¼è¿ç®æ¡æ¶ï¼ä½åºäºRAMåä¼å设计ï¼å æ¤å¨äº¤æ¢å¼æ°æ®åæådataminingçWorkloadä¸è¡¨ç°ä¸éã
è¿å ¥å¹´ä»¥åï¼Sparkå¼æºç çæç³»ç»å¤§å¹ å¢é¿ï¼å·²æ为大æ°æ®èç´ææ´»è·çå¼æºç 项ç®ä¹ä¸ãSparkä¹æ以æå¦æ¤å¤çå ³æ³¨ï¼åå 主è¦æ¯å 为Sparkå ·æçé«æ§è½ãé«çµæ´»æ§ãä¸Hadoopçæç³»ç»å®ç¾èåçä¸æ¹é¢çç¹ç¹ã
é¦å ï¼Spark对åæ£çæ°æ®éè¿è¡æ½æ ·ï¼åæ°å°æåºRDD(ResilientDistributedDataset)çæ¦å¿µï¼ææçç»è®¡åæä»»å¡è¢«ç¿»è¯æ对RDDçåºæ¬æä½ç»æçæåæ ç¯å¾(DAG)ãRDDå¯ä»¥è¢«é©»çå¨RAMä¸ï¼å¾åçä»»å¡å¯ä»¥ç´æ¥è¯»åRAMä¸çæ°æ®;åæ¶åæDAGä¸ä»»å¡ä¹é´çä¾èµæ§å¯ä»¥æç¸é»çä»»å¡å并ï¼ä»èåå°äºå¤§éä¸åç¡®çç»æè¾åºï¼æ大åå°äºHarddiskI/Oï¼ä½¿å¤ææ°æ®åæä»»å¡æ´é«æãä»è¿ä¸ªæ¨ç®ï¼å¦æä»»å¡å¤å¤æï¼Sparkæ¯Map/Reduceå¿«ä¸å°ä¸¤åã
å ¶æ¬¡ï¼Sparkæ¯ä¸ä¸ªçµæ´»çè¿ç®æ¡æ¶ï¼éååæ¹æ¬¡å¤çãå·¥ä½æµã交äºå¼åæãæµéå¤ççä¸åç±»åçåºç¨ï¼å æ¤Sparkä¹å¯ä»¥æ为ä¸ä¸ªç¨é广æ³çè¿ç®å¼æï¼å¹¶å¨æªæ¥å代Map/Reduceçå°ä½ã
æåï¼Sparkå¯ä»¥ä¸Hadoopçæç³»ç»çå¾å¤ç»ä»¶äºç¸æä½ãSparkå¯ä»¥è¿è¡å¨æ°ä¸ä»£èµæºç®¡çæ¡æ¶YARNä¸ï¼å®è¿å¯ä»¥è¯»åå·²æ并åæ¾å¨Hadoopä¸çæ°æ®ï¼è¿æ¯ä¸ªé常大çä¼å¿ã
è½ç¶Sparkå ·æ以ä¸ä¸å¤§ä¼ç¹ï¼ä½ä»ç®åSparkçåå±ååºç¨ç°ç¶æ¥çï¼Sparkæ¬èº«ä¹åå¨å¾å¤ç¼ºé·ï¼ä¸»è¦å æ¬ä»¥ä¸å 个æ¹é¢ï¼
â稳å®æ§æ¹é¢ï¼ç±äºä»£ç è´¨éé®é¢ï¼Sparké¿æ¶é´è¿è¡ä¼ç»å¸¸åºéï¼å¨æ¶ææ¹é¢ï¼ç±äºå¤§éæ°æ®è¢«ç¼åå¨RAMä¸ï¼Javaåæ¶åå¾ç¼æ ¢çæ åµä¸¥éï¼å¯¼è´Sparkæ§è½ä¸ç¨³å®ï¼å¨å¤æåºæ¯ä¸SQLçæ§è½çè³ä¸å¦ç°æçMap/Reduceã
âä¸è½å¤ç大æ°æ®ï¼åç¬æºå¨å¤çæ°æ®è¿å¤§ï¼æè ç±äºæ°æ®åºç°é®é¢å¯¼è´ä¸é´ç»æè¶ è¿RAMç大å°æ¶ï¼å¸¸å¸¸åºç°RAM空é´ä¸è¶³ææ æ³å¾åºç»æãç¶èï¼Map/Reduceè¿ç®æ¡æ¶å¯ä»¥å¤ç大æ°æ®ï¼å¨è¿æ¹é¢ï¼Sparkä¸å¦Map/Reduceè¿ç®æ¡æ¶ææã
âä¸è½æ¯æå¤æçSQLç»è®¡;ç®åSparkæ¯æçSQLè¯æ³å®æ´ç¨åº¦è¿ä¸è½åºç¨å¨å¤ææ°æ®åæä¸ãå¨å¯ç®¡çæ§æ¹é¢ï¼SparkYARNçç»åä¸å®åï¼è¿å°±ä¸ºä½¿ç¨è¿ç¨ä¸åä¸é忧ï¼å®¹æåºç°åç§é¾é¢ã
è½ç¶Sparkæ´»è·å¨ClouderaãMapRãHortonworksçä¼å¤ç¥å大æ°æ®å ¬å¸ï¼ä½æ¯å¦æSparkæ¬èº«ç缺é·å¾ä¸å°åæ¶å¤çï¼å°ä¼ä¸¥éå½±åSparkçæ®åååå±ã