Spark源码阅读:CoarseGrainedExecutorBackend的启动

  


系列导读

想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~

1.Spark Submit任务提交

2.Driver的注册与启动

3.SparkContext的初始化

4.TaskScheduler的创建和启动

5.Application的注册

6.Master注册机制-Application的注册与Application的调度算法

7.Spark源码阅读:Executor的启动

8.CoarseGrainedExecutorBackend的启动

9.DAGScheduler源码分析

10.DAGScheduler Stage划分与Task最佳位置计算

11.TaskScheduler调度算法与调度池

12.TaskScheduler任务提交与资源分配

13.Task的启动

14.Task的结果处理


上篇文章介绍了,executor的启动,文章最后调用fetchAndRunExecutor方法,创建ProcessBuilder,用于执行命令启动CoarseGrainedExecutorBackend

执行的命令如下:

/opt/jdk1.8/bin/java -Dhdp.version=2.6.0.3-8  \
-cp /usr/hdp/2.6.0.3-8/spark2/conf/:/usr/hdp/2.6.0.3-8/spark2/jars/*:/usr/hdp/current/hadoop-client/conf/   \
-Xmx1024M     \
-Dspark.history.ui.port=18081  \
-Dspark.driver.port=33197  \
org.apache.spark.executor.CoarseGrainedExecutorBackend  \
–driver-url spark://CoarseGrainedScheduler@192.168.1.20:33197  \
–executor-id 0     \
–hostname 192.168.1.20    \
–cores 5    \
–app-id app-20180419170349-0522     \
–worker-url spark://Worker@192.168.1.20:37617

执行命令,调用CoarseGrainedExecutorBackend的main方法,启动CoarseGrainedExecutorBackend进程,该进程为Executor的守护进程,用于executor的创建和维护,今天这篇文章来看一下CoarseGrainedExecutorBackend的启动。

main方法如下,做了几个操作:

1.获取命令行参数

2.将参数解析,用变量接收,如果参数不合法或为空,提示用法,并退出

3.将接收参数的变量传入,调用run方法,run方法用于启动executor

 def main(args: Array[String]) {
    //  声明变量
    var driverUrl: String = null
    var executorId: String = null
    var hostname: String = null
    var cores: Int = 0
    var appId: String = null
    var workerUrl: Option[String] = None
    val userClassPath = new mutable.ListBuffer[URL]()

    //  获取程序命令行参数列表
    var argv = args.toList
    while (!argv.isEmpty) {
      //  根据参数进行模式匹配,拼装参数
      argv match {
        case ("--driver-url") :: value :: tail =>
          driverUrl = value
          argv = tail
        case ("--executor-id") :: value :: tail =>
          executorId = value
          argv = tail
        case ("--hostname") :: value :: tail =>
          hostname = value
          argv = tail
        case ("--cores") :: value :: tail =>
          cores = value.toInt
          argv = tail
        case ("--app-id") :: value :: tail =>
          appId = value
          argv = tail
        case ("--worker-url") :: value :: tail =>
          // Worker url is used in spark standalone mode to enforce fate-sharing with worker
          workerUrl = Some(value)
          argv = tail
        case ("--user-class-path") :: value :: tail =>
          userClassPath += new URL(value)
          argv = tail
        case Nil =>
        case tail =>
          System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
          printUsageAndExit()
      }
    }

    if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
      appId == null) {
      printUsageAndExit()
    }

    //  调用run方法,将赋值后参数传入
    run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
    System.exit(0)
  }

看一下run方法

run方法位于CoarseGrainedExecutorBackend的伴生对象中,方法中做了如下操作

1.创建名为fetcher的RpcEnv

2.向driver发送消息RetrieveSparkAppConfig,获取spark属性信息

3.获取driver回复的消息,将属性信息添加到Seq队列,然后关闭fetcher RpcEnv

4.创建executor自己的sparkConf,将刚获取的属性信息添加到sparkConf中

5.调用SparkEnv.createExecutorEnv方法,创建executor sparkEnv

6.创建CoarseGrainedExecutorBackend实例,并注册到自身的Executor Env的rpcEnv中

7.创建WorkerWatcher,用于当worker发生异常情况时,关闭CoarseGrainedExecutorBackend

8.等待rpcEnv退出

 private def run(
      driverUrl: String,
      executorId: String,
      hostname: String,
      cores: Int,
      appId: String,
      workerUrl: Option[String],
      userClassPath: Seq[URL]) {

    Utils.initDaemon(log)

    SparkHadoopUtil.get.runAsSparkUser { () =>
   
      val executorConf = new SparkConf
      val port = executorConf.getInt("spark.executor.port", 0)
      //  创建名为fetcher的RpcEnv,用于从driver获取信息
      val fetcher = RpcEnv.create(
        "driverPropsFetcher",
        hostname,
        port,
        executorConf,
        new SecurityManager(executorConf),
        clientMode = true)
      //  获取driver EndpointRef用于通信
      val driver = fetcher.setupEndpointRefByURI(driverUrl)
      //  向driver发送消息RetrieveSparkAppConfig,获取spark属性信息
      val cfg = driver.askWithRetry[SparkAppConfig](RetrieveSparkAppConfig)
      //  将获取的属性信息,添加到队列
      val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
      //  关闭fetcher RpcEnv
      fetcher.shutdown()

      // Create SparkEnv using properties we fetched from the driver.
      //  创建executor自己的spark conf
      val driverConf = new SparkConf()
      for ((key, value) <- props) {
        if (SparkConf.isExecutorStartupConf(key)) {
          driverConf.setIfMissing(key, value)
        } else {
          //  将获取的信息添加到drvierConf中
          driverConf.set(key, value)
        }
      }
      if (driverConf.contains("spark.yarn.credentials.file")) {
        logInfo("Will periodically update credentials from: " +
          driverConf.get("spark.yarn.credentials.file"))
        SparkHadoopUtil.get.startCredentialUpdater(driverConf)
      }

      //  创建Executor的SparkEnv
      val env = SparkEnv.createExecutorEnv(
        driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)

      //  创建CoarseGrainedExecutorBackend实例,并注册到自身的Executor Env的rpcEnv中
      env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
        env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
      //  创建WorkerWatcher,用于当worker发生异常情况时,关闭CoarseGrainedExecutorBackend
      //  CoarseGrainedExecutorBackend与worker共生死
      workerUrl.foreach { url =>
        env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
      }
      //  等待,直到rpcEnv退出
      env.rpcEnv.awaitTermination()
      SparkHadoopUtil.get.stopCredentialUpdater()
    }
  }

在run方法中,创建了CoarseGrainedExecutorBackend实例,并注册到自身的Executor Env的rpcEnv中,在注册到RpcEnv的过程中会触发CoarseGrainedExecutorBackend的onStart方法,执行该方法

onStart方法中,向driver发送了RegisterExecutor消息,即向CoarseGrainedSchedulerBackend发送RegisterExecutor消息

override def onStart() {
    logInfo("Connecting to driver: " + driverUrl)
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      driver = Some(ref)
      // 向Driver发送RegisterExecutor消息
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
    }(ThreadUtils.sameThread).onComplete {
      case Success(msg) =>
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
  }

来到CoarseGrainedSchedulerBackend端,接受到消息后,做出如下处理:

1.判断executor是否重复注册

2.添加内存缓存,向addressToExecutorId  hashMap中添加executor id与rpc Address信息

3.增加totalCoreCount总数,以表示drvier已经获得的core总数

4.向已注册的executor总数集合中加1

5.封装ExecutorData对象

6.将分装的ExecutorData对象,executor Id添加到executorDataMap缓存中

7.更新currentExecutorIdCounter、numPendingExecutors信息

8.向CoarseGrainedExecutorBackend发送消息Executor注册成功

9.向listenerBus投递SparkListenerExecutorAdded事件

10.调用makeOffer方法,给Task分配资源

      case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
        //  判断executor是否重复注册
        if (executorDataMap.contains(executorId)) {
          executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
          context.reply(true)
        } else
        {
          val executorAddress = if (executorRef.address != null) {
              executorRef.address
            } else {
              context.senderAddress
            }
          //  打印日志
          logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
          //  添加内存缓存,向hashMap中添加executor id与rpc Address信息
          addressToExecutorId(executorAddress) = executorId
          //  增加totalCoreCount总数,以表示drvier已经获得的core总数
          totalCoreCount.addAndGet(cores)
          //  向totalRegisteredExecutors中 加1
          totalRegisteredExecutors.addAndGet(1)
          //  封装ExecutorData
          val data = new ExecutorData(executorRef, executorRef.address, hostname,
            cores, cores, logUrls)
          //  同步
          CoarseGrainedSchedulerBackend.this.synchronized {
            //  将executorId与data信息添加到executorDataMap中
            executorDataMap.put(executorId, data)
            //  更新currentExecutorIdCounter、numPendingExecutors信息
            if (currentExecutorIdCounter < executorId.toInt) {
              currentExecutorIdCounter = executorId.toInt
            }
            if (numPendingExecutors > 0) {
              numPendingExecutors -= 1
              logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
            }
          }
          //  向CoarseGrainedExecutorBackend发送消息Executor注册成功
          executorRef.send(RegisteredExecutor)
          context.reply(true)
          //  向listenerBus投递SparkListenerExecutorAdded事件
          listenerBus.post(
            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
          //  调用makeOffer方法,给Task分配资源
          makeOffers()
        }

回到CoarseGrainedExecutorBackend端,在接收到RegisteredExecutor消息后,创建executor对象,创建完毕的Executord对象,用于之后执行Driver分配Task

 case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      try {
        //  创建Executor,由Executor实现大部分功能
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
      } catch {
        case NonFatal(e) =>
          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
      }

至此CoarseGrainedExecutorBackend的启动部分,源码阅读完毕。

我们下篇文章见~







赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

You may also like...

发表评论

邮箱地址不会被公开。