Spark源码阅读:SparkContext的初始化


系列导读

想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~

1.Spark Submit任务提交

2.Driver的注册与启动

3.SparkContext的初始化

4.TaskScheduler的创建和启动

5.Application的注册

6.Master注册机制-Application的注册与Application的调度算法

7.Spark源码阅读:Executor的启动

8.CoarseGrainedExecutorBackend的启动

9.DAGScheduler源码分析

10.DAGScheduler Stage划分与Task最佳位置计算

11.TaskScheduler调度算法与调度池

12.TaskScheduler任务提交与资源分配

13.Task的启动

14.Task的结果处理


上篇文章Spark源码阅读:Driver的注册与启动,介绍了Driver的注册和启动流程,在启动时,实际提交用户应用程序命令如下,SparkPi程序正式启动

/opt/jdk1.8/bin/java -Dhdp.version=2.6.0.3-8 -cp /usr/hdp/current/spark2-historyserver/conf/:/usr/hdp/2.6.0.3-8/spark2/jars/*:/usr/hdp/current/hadoop-client/conf/ 
-Xmx1024M 
-Dspark.history.kerberos.keytab=none 
-Dspark.eventLog.enabled=true 
-Dspark.yarn.historyServer.address=192.168.1.20:18081 
-Dspark.jars=file:/usr/hdp/2.6.0.3-8/spark2/bin/../examples/jars/spark-examples_2.11-2.1.0.2.6.0.3-8.jar 
-Dspark.submit.deployMode=cluster 
-Dspark.history.ui.port=18081 
-Dspark.executor.memory=2G
-Dspark.driver.extraLibraryPath=/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64 
-Dspark.yarn.queue=default 
-Dspark.executor.extraLibraryPath=/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64 
-Dspark.history.fs.logDirectory=hdfs:///spark2-history/ 
-Dspark.driver.supervise=false 
-Dspark.app.name=org.apache.spark.examples.SparkPi
-Dspark.history.kerberos.principal=none 
-Dspark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider 
-Dspark.cores.max=5 
-Dspark.rpc.askTimeout=10s 
-Dspark.master=spark://192.168.1.20:7077 
-Dspark.eventLog.dir=hdfs:///spark2-history/
org.apache.spark.deploy.worker.DriverWrapper
spark://Worker@192.168.1.20:40612  /usr/hdp/2.6.0.3-8/spark2/work/driver-20180423114006-0009/spark-examples_2.11-2.1.0.2.6.0.3-8.jar 
org.apache.spark.examples.SparkPi 
1000

SparkPi程序代码如下:

object SparkPi {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi")
    val spark = new SparkContext(conf)
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
  }
}

其中有两个步骤,很重要

1.创建一个SparkConf对象

2.创建一个SparkContext对象,SparkContext是这个Spark程序的入口

那么今天的文章,就来围绕着这两个步骤来说

先来说一下SparkConf

SparkConf

Spark作为一个优秀的计算框架,配备了各种各样的系统配置参数

SparkConf是Spark的配置类,Spark中的每个组件都是直接或者间接的使用着SparkConf存储的属性,这些属性以键值对的形式,存储于如下的数据结构中

private val settings = new ConcurrentHashMap[String, String]()

获取SparkConf的配置信息通过如下3种方式:

1.从系统属性中获取

2.SparkConf中API进行配置

3.从其他克隆的SparkConf中获取

从系统属性中获取

SparkConf中有一个布尔类型的loadDefaults构造器属性,当它为true时,加载系统属性,即第一种方式

使用Utils工具类的getSystemProperties 方法,获取所有以spark.为前缀的key和value,然后调用了set方法(下面会说),将配置添加到settings中去

//  加载默认配置
  if (loadDefaults) {
    loadFromSystemProperties(false)
  }

  private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
    // Load any spark.* system properties
    //  加载所有键值对,获取前缀为spark的key和value
    for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
      //  调用set方法
      set(key, value, silent)
    }
    this
  }

SparkConf中API进行配置

我们通常使用SparkConf的API进行添加Spark配置,主要使用set方法

  private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {
    //  如果key为空,抛出空指针异常
    if (key == null) {
      throw new NullPointerException("null key")
    }
    //  如果value为空,抛出空指针异常
    if (value == null) {
      throw new NullPointerException("null value for " + key)
    }
    if (!silent) {
      logDeprecationWarning(key)
    }
    //  添加到settings中
    settings.put(key, value)
    this
  }

其中有些API调用了是set的重载方法

 def set(key: String, value: String): SparkConf = {
    set(key, value, false)
  }

比如常见的setMaster、setAppName、setJars都是调用了重载的set方法

  def setMaster(master: String): SparkConf = {
    set("spark.master", master)
  }
  
  def setAppName(name: String): SparkConf = {
    set("spark.app.name", name)
  }
 
  def setJars(jars: Seq[String]): SparkConf = {
    for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor")
    set("spark.jars", jars.filter(_ != null).mkString(","))
  }

从其他克隆的SparkConf中获取

在某些情况下,同一个SparkConf实例中的配置信息需要被Spark中的多个组件共用

例如,组件A中存在一个SparkConf实例a,组件B中也需要实例a中的配置信息,我们可以将SparkConf实例定义为全局变量或者通过参数传递给其他组件,但是会有并发问题。虽然settings线程是安全的ConcurrentHashMap数据结构,也适用于高并发,但只要存在并发,就一定会有性能损失。

我们可以新建一个SparkConf实例b,并将a中的配置信息拷贝到b中,但是这种方法并不怎么好,SparkConf继承了Cloneable特质,并实现了clone方法,clone方法通过实现Cloneable特质提高了代码的可复读行

  override def clone: SparkConf = {
    val cloned = new SparkConf(false)
    settings.entrySet().asScala.foreach { e =>
      cloned.set(e.getKey(), e.getValue(), true)
    }
    cloned
  }

值得注意的是,一旦SparkConf对象提交到Spark,就会被克隆(在SparkContext初始化中),且不会被任何用户修改,Spark不支持在运行时修改配置


SparkContext

现在来说SparkContext

SparkContext 的初始化步骤如下:

1)创建Spark 执行环境SparkEnv;

2)创建并初始化Spark UI;

3)Hadoop 相关配置及Executor 环境变量的设置;

4)创建心跳接收器;

5)创建任务调度器TaskScheduler;

6)创建和启动DAGScheduler;

7)TaskScheduler 的启动;

8)初始化块管理器BlockManager

9)启动测量系统MetricsSystem;

10)创建和启动Executor 分配管理器ExecutorAllocationManager;

11)ContextCleaner 的创建与启动;

12)Spark 环境更新;

13)创建DAGSchedulerSource 和BlockManagerSource;

14)将SparkContext 标记为激活。

主要阅读TaskScheduler、DAGScheduler的创建和启动,其他部分先简单的看一下,等到具体模块时,再做研究。

SparkContext中首先对SparkConf进行克隆

然后对克隆的SparkConf的配置进行检测,检查非法或废弃的配置信息,将弃用的配置信息转换成支持的配置信息

    _conf = config.clone()
    _conf.validateSettings()

然后判断SparkConf中是否设置了master和app name,master用于设置部署模式,app name用于指定应用程序名称,这两个配置很重要

    if (!_conf.contains("spark.master")) {
      throw new SparkException("A master URL must be set in your configuration")
    }
    if (!_conf.contains("spark.app.name")) {
      throw new SparkException("An application name must be set in your configuration")
    }

省略了一些代码,来看创建SparkEnv,createSparkEnv方法其实是调用createDriverEnv创建SparkEnv

 //  创建Spark Env
    _env = createSparkEnv(_conf, isLocal, listenerBus)
    SparkEnv.set(_env)
 private[spark] def createSparkEnv(
      conf: SparkConf,   //conf是对SparkConf的复制
      isLocal: Boolean,   //isLocal标识判断是否单机
      listenerBus: LiveListenerBus): SparkEnv = {
    //  其实是调用createDriverEnv创建SparkEnv
    SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master)) 
  }

createDriverEnv方法中,获取Driver相关信息,然后调用了SparkEnv私有的create方法

  private[spark] def createDriverEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus,
      numCores: Int,
      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
    assert(conf.contains(DRIVER_HOST_ADDRESS),
      s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
    assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
    //  从spark conf中获取Driver绑定地址
    val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
    //  从spark conf中获取Driver host地址
    val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
    //  从spark conf中获取Driver端口
    val port = conf.get("spark.driver.port").toInt
    //  是否设置I/O加密的秘钥
    val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
      Some(CryptoStreamUtils.createKey(conf))
    } else {
      None
    }
    create(
      conf,
      SparkContext.DRIVER_IDENTIFIER,
      bindAddress,
      advertiseAddress,
      port,
      isLocal,
      numCores,
      ioEncryptionKey,
      listenerBus = listenerBus,
      mockOutputCommitCoordinator = mockOutputCommitCoordinator
    )
  }

创建完SparkEnv后,创建并初始化UI

    _ui =
      if (conf.getBoolean("spark.ui.enabled", true)) {
        Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
          _env.securityManager, appName, startTime = startTime))
      } else {
        // For tests, do not enable the UI
        None
      }
    //  在启动ask scheduler之前绑定UI
    //  正确地连接到集群管理器
    _ui.foreach(_.bind())

获取hadoop配置信息

//  获取hadoop配置信息
    _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)

获取Executor内存环境变量

//  获取executor 内存环境变量
    _executorMemory = _conf.getOption("spark.executor.memory")
      .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
      .orElse(Option(System.getenv("SPARK_MEM"))
      .map(warnSparkMem))
      .map(Utils.memoryStringToMb)
      .getOrElse(1024)

driver创建心跳接收器用来和executor进行交互

 //  创建心跳接收器
    _heartbeatReceiver = env.rpcEnv.setupEndpoint(
      HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

创建TaskScheduler,创建schedulerBackend ,创建DAGScheduler,并启动DAGScheduler

//创建TaskScheduler调度器
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

调用TaskScheduler的start方法,启动TaskScheduler

_taskScheduler.start()

blockManager进行初始化

blockManager是存储模块中的重要组件 

 //  blockManager进行初始化
    _env.blockManager.initialize(_applicationId)

启动metricsSystem测量系统,是Spark应用程序的度量系统。

// 启动metricsSystem测量系统
    _env.metricsSystem.start()

创建和启动Executor分配管理器 ExecutorAllocationManager

ExecutorAllocationManager是基于工作负载动态分配Executor和删除Executor的代理

    // 创建和启动Executor分配管理器 ExecutorAllocationManager
    val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
    _executorAllocationManager =
      if (dynamicAllocationEnabled) {
        schedulerBackend match {
          case b: ExecutorAllocationClient =>
            Some(new ExecutorAllocationManager(
              schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
          case _ =>
            None
        }
      } else {
        None
      }
    _executorAllocationManager.foreach(_.start())

ContextCleaner 的创建与启动

ContextCleaner 用于清理那些超出范围的RDD、Shuffle对应的map任务状态、Shuffel元数据、Broadcast对象及RDD的CheckPoint数据。

// ContextCleaner 的创建与启动
    _cleaner =
      if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
        Some(new ContextCleaner(this))
      } else {
        None
      }
    _cleaner.foreach(_.start())

设置并启动监听总线ListenerBus

//  设置并启动监听总线ListenerBus
    setupAndStartListenerBus()

spark 环境更新

 //  spark 环境更新
    postEnvironmentUpdate()

向事件总线投递Application启动

    //  向事件总线投递Application启动
    postApplicationStart()

向度量系统注册DAGSchedulerSource,BlockManagerSource,以及executorAllocationManagerSource

  //  创建DAGSchedulerSource
    _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
    //  创建BlockManagerSource
    _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
    _executorAllocationManager.foreach { e =>
      _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
    }

将SparkContext标记为激活

//  将SparkContext标记为激活
  SparkContext.setActiveContext(this, allowMultipleContexts)

至此,SparkContext的初始化完成

下篇文章将要从创建TaskScheduler,创建schedulerBackend ,创建DAGScheduler开始

我们,下篇文章见~


亲,看完了点个赞呗!

赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

You may also like...

3 Responses

  1. 匿名说道:

    写的非常棒

  2. 匿名说道:

    写的好!

发表评论

邮箱地址不会被公开。