Spark源码阅读:SparkContext的初始化
系列导读:
想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~
6.Master注册机制-Application的注册与Application的调度算法
8.CoarseGrainedExecutorBackend的启动
10.DAGScheduler Stage划分与Task最佳位置计算
13.Task的启动
14.Task的结果处理
上篇文章Spark源码阅读:Driver的注册与启动,介绍了Driver的注册和启动流程,在启动时,实际提交用户应用程序命令如下,SparkPi程序正式启动
/opt/jdk1.8/bin/java -Dhdp.version=2.6.0.3-8 -cp /usr/hdp/current/spark2-historyserver/conf/:/usr/hdp/2.6.0.3-8/spark2/jars/*:/usr/hdp/current/hadoop-client/conf/ -Xmx1024M -Dspark.history.kerberos.keytab=none -Dspark.eventLog.enabled=true -Dspark.yarn.historyServer.address=192.168.1.20:18081 -Dspark.jars=file:/usr/hdp/2.6.0.3-8/spark2/bin/../examples/jars/spark-examples_2.11-2.1.0.2.6.0.3-8.jar -Dspark.submit.deployMode=cluster -Dspark.history.ui.port=18081 -Dspark.executor.memory=2G -Dspark.driver.extraLibraryPath=/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64 -Dspark.yarn.queue=default -Dspark.executor.extraLibraryPath=/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64 -Dspark.history.fs.logDirectory=hdfs:///spark2-history/ -Dspark.driver.supervise=false -Dspark.app.name=org.apache.spark.examples.SparkPi -Dspark.history.kerberos.principal=none -Dspark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider -Dspark.cores.max=5 -Dspark.rpc.askTimeout=10s -Dspark.master=spark://192.168.1.20:7077 -Dspark.eventLog.dir=hdfs:///spark2-history/ org.apache.spark.deploy.worker.DriverWrapper spark://Worker@192.168.1.20:40612 /usr/hdp/2.6.0.3-8/spark2/work/driver-20180423114006-0009/spark-examples_2.11-2.1.0.2.6.0.3-8.jar org.apache.spark.examples.SparkPi 1000
SparkPi程序代码如下:
object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow val count = spark.parallelize(1 until n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / n) spark.stop() } }
其中有两个步骤,很重要
1.创建一个SparkConf对象
2.创建一个SparkContext对象,SparkContext是这个Spark程序的入口
那么今天的文章,就来围绕着这两个步骤来说
先来说一下SparkConf
SparkConf
Spark作为一个优秀的计算框架,配备了各种各样的系统配置参数
SparkConf是Spark的配置类,Spark中的每个组件都是直接或者间接的使用着SparkConf存储的属性,这些属性以键值对的形式,存储于如下的数据结构中
private val settings = new ConcurrentHashMap[String, String]()
获取SparkConf的配置信息通过如下3种方式:
1.从系统属性中获取
2.SparkConf中API进行配置
3.从其他克隆的SparkConf中获取
从系统属性中获取
SparkConf中有一个布尔类型的loadDefaults构造器属性,当它为true时,加载系统属性,即第一种方式
使用Utils工具类的getSystemProperties 方法,获取所有以spark.为前缀的key和value,然后调用了set方法(下面会说),将配置添加到settings中去
// 加载默认配置 if (loadDefaults) { loadFromSystemProperties(false) } private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = { // Load any spark.* system properties // 加载所有键值对,获取前缀为spark的key和value for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) { // 调用set方法 set(key, value, silent) } this }
SparkConf中API进行配置
我们通常使用SparkConf的API进行添加Spark配置,主要使用set方法
private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = { // 如果key为空,抛出空指针异常 if (key == null) { throw new NullPointerException("null key") } // 如果value为空,抛出空指针异常 if (value == null) { throw new NullPointerException("null value for " + key) } if (!silent) { logDeprecationWarning(key) } // 添加到settings中 settings.put(key, value) this }
其中有些API调用了是set的重载方法
def set(key: String, value: String): SparkConf = { set(key, value, false) }
比如常见的setMaster、setAppName、setJars都是调用了重载的set方法
def setMaster(master: String): SparkConf = { set("spark.master", master) } def setAppName(name: String): SparkConf = { set("spark.app.name", name) } def setJars(jars: Seq[String]): SparkConf = { for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor") set("spark.jars", jars.filter(_ != null).mkString(",")) }
从其他克隆的SparkConf中获取
在某些情况下,同一个SparkConf实例中的配置信息需要被Spark中的多个组件共用
例如,组件A中存在一个SparkConf实例a,组件B中也需要实例a中的配置信息,我们可以将SparkConf实例定义为全局变量或者通过参数传递给其他组件,但是会有并发问题。虽然settings线程是安全的ConcurrentHashMap数据结构,也适用于高并发,但只要存在并发,就一定会有性能损失。
我们可以新建一个SparkConf实例b,并将a中的配置信息拷贝到b中,但是这种方法并不怎么好,SparkConf继承了Cloneable特质,并实现了clone方法,clone方法通过实现Cloneable特质提高了代码的可复读行
override def clone: SparkConf = { val cloned = new SparkConf(false) settings.entrySet().asScala.foreach { e => cloned.set(e.getKey(), e.getValue(), true) } cloned }
值得注意的是,一旦SparkConf对象提交到Spark,就会被克隆(在SparkContext初始化中),且不会被任何用户修改,Spark不支持在运行时修改配置
SparkContext
现在来说SparkContext
SparkContext 的初始化步骤如下:
1)创建Spark 执行环境SparkEnv;
2)创建并初始化Spark UI;
3)Hadoop 相关配置及Executor 环境变量的设置;
4)创建心跳接收器;
5)创建任务调度器TaskScheduler;
6)创建和启动DAGScheduler;
7)TaskScheduler 的启动;
8)初始化块管理器BlockManager;
9)启动测量系统MetricsSystem;
10)创建和启动Executor 分配管理器ExecutorAllocationManager;
11)ContextCleaner 的创建与启动;
12)Spark 环境更新;
13)创建DAGSchedulerSource 和BlockManagerSource;
14)将SparkContext 标记为激活。
主要阅读TaskScheduler、DAGScheduler的创建和启动,其他部分先简单的看一下,等到具体模块时,再做研究。
SparkContext中首先对SparkConf进行克隆
然后对克隆的SparkConf的配置进行检测,检查非法或废弃的配置信息,将弃用的配置信息转换成支持的配置信息
_conf = config.clone() _conf.validateSettings()
然后判断SparkConf中是否设置了master和app name,master用于设置部署模式,app name用于指定应用程序名称,这两个配置很重要
if (!_conf.contains("spark.master")) { throw new SparkException("A master URL must be set in your configuration") } if (!_conf.contains("spark.app.name")) { throw new SparkException("An application name must be set in your configuration") }
省略了一些代码,来看创建SparkEnv,createSparkEnv方法其实是调用createDriverEnv创建SparkEnv
// 创建Spark Env _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env)
private[spark] def createSparkEnv( conf: SparkConf, //conf是对SparkConf的复制 isLocal: Boolean, //isLocal标识判断是否单机 listenerBus: LiveListenerBus): SparkEnv = { // 其实是调用createDriverEnv创建SparkEnv SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master)) }
createDriverEnv方法中,获取Driver相关信息,然后调用了SparkEnv私有的create方法
private[spark] def createDriverEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus, numCores: Int, mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { assert(conf.contains(DRIVER_HOST_ADDRESS), s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!") assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!") // 从spark conf中获取Driver绑定地址 val bindAddress = conf.get(DRIVER_BIND_ADDRESS) // 从spark conf中获取Driver host地址 val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS) // 从spark conf中获取Driver端口 val port = conf.get("spark.driver.port").toInt // 是否设置I/O加密的秘钥 val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) { Some(CryptoStreamUtils.createKey(conf)) } else { None } create( conf, SparkContext.DRIVER_IDENTIFIER, bindAddress, advertiseAddress, port, isLocal, numCores, ioEncryptionKey, listenerBus = listenerBus, mockOutputCommitCoordinator = mockOutputCommitCoordinator ) }
创建完SparkEnv后,创建并初始化UI
_ui = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener, _env.securityManager, appName, startTime = startTime)) } else { // For tests, do not enable the UI None } // 在启动ask scheduler之前绑定UI // 正确地连接到集群管理器 _ui.foreach(_.bind())
获取hadoop配置信息
// 获取hadoop配置信息 _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
获取Executor内存环境变量
// 获取executor 内存环境变量 _executorMemory = _conf.getOption("spark.executor.memory") .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) .orElse(Option(System.getenv("SPARK_MEM")) .map(warnSparkMem)) .map(Utils.memoryStringToMb) .getOrElse(1024)
driver创建心跳接收器用来和executor进行交互
// 创建心跳接收器 _heartbeatReceiver = env.rpcEnv.setupEndpoint( HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
创建TaskScheduler,创建schedulerBackend ,创建DAGScheduler,并启动DAGScheduler
//创建TaskScheduler调度器 val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
调用TaskScheduler的start方法,启动TaskScheduler
_taskScheduler.start()
blockManager进行初始化
blockManager是存储模块中的重要组件
// blockManager进行初始化 _env.blockManager.initialize(_applicationId)
启动metricsSystem测量系统,是Spark应用程序的度量系统。
// 启动metricsSystem测量系统 _env.metricsSystem.start()
创建和启动Executor分配管理器 ExecutorAllocationManager
ExecutorAllocationManager是基于工作负载动态分配Executor和删除Executor的代理
// 创建和启动Executor分配管理器 ExecutorAllocationManager val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) _executorAllocationManager = if (dynamicAllocationEnabled) { schedulerBackend match { case b: ExecutorAllocationClient => Some(new ExecutorAllocationManager( schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf)) case _ => None } } else { None } _executorAllocationManager.foreach(_.start())
ContextCleaner 的创建与启动
ContextCleaner 用于清理那些超出范围的RDD、Shuffle对应的map任务状态、Shuffel元数据、Broadcast对象及RDD的CheckPoint数据。
// ContextCleaner 的创建与启动 _cleaner = if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this)) } else { None } _cleaner.foreach(_.start())
设置并启动监听总线ListenerBus
// 设置并启动监听总线ListenerBus setupAndStartListenerBus()
spark 环境更新
// spark 环境更新 postEnvironmentUpdate()
向事件总线投递Application启动
// 向事件总线投递Application启动 postApplicationStart()
向度量系统注册DAGSchedulerSource,BlockManagerSource,以及executorAllocationManagerSource
// 创建DAGSchedulerSource _env.metricsSystem.registerSource(_dagScheduler.metricsSource) // 创建BlockManagerSource _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) }
将SparkContext标记为激活
// 将SparkContext标记为激活 SparkContext.setActiveContext(this, allowMultipleContexts)
至此,SparkContext的初始化完成
下篇文章将要从创建TaskScheduler,创建schedulerBackend ,创建DAGScheduler开始
我们,下篇文章见~
亲,看完了点个赞呗!
写的非常棒
感谢您的支持
写的好!