Spark源码阅读:Executor的启动

  


系列导读

想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~

1.Spark Submit任务提交

2.Driver的注册与启动

3.SparkContext的初始化

4.TaskScheduler的创建和启动

5.Application的注册

6.Master注册机制-Application的注册与Application的调度算法

7.Spark源码阅读:Executor的启动

8.CoarseGrainedExecutorBackend的启动

9.DAGScheduler源码分析

10.DAGScheduler Stage划分与Task最佳位置计算

11.TaskScheduler调度算法与调度池

12.TaskScheduler任务提交与资源分配

13.Task的启动

14.Task的结果处理


继上一篇文章Spark源码阅读:Master注册机制-Application的注册与Application的调度算法

application向Master注册并提交后,Master最终会调用schedule方法规划分配将要worker上运行的executor的资源,然后调用allocateWorkerResourceToExecutors将资源分配给executor,并启动executor

schedule方法详见:Spark源码阅读:Master注册机制-Application的注册与Application的调度算法


launchExecutor

      //  启动executor
      launchExecutor(worker, exec)

launchExecutor方法中,先向worker发送launchExecutor消息,然后向driver发送消息,告诉driver,executor已添加

 private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec)
    // 向worker发送消息,启动executor
    worker.endpoint.send(LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
    //  向driver发送消息,executor已添加
    exec.application.driver.send(
      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
  }

来到Worker端

worker端接收到launchExecutor消息后

1.先会判断消息发送的master是否为alive

2.接着创建executor的工作目录和本地临时目录

3.将master发送来的信息封装为ExecutorRunner对象ExecutorRunner用来管理一个executor进程的执行,目前只在standalone模式下使用

4.调用ExecutorRunner的start方法

5.记录executor使用的资源

6.向Master发送消息,报告当前executor的状态

  // 启动executor
    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
      //  判断是否是alive master发送来的消息
      if (masterUrl != activeMasterUrl) {
        //  如果不是,打印warn信息,无效的master尝试启动executor
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else {
        try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))

          //  创建executor工作目录
          val executorDir = new File(workDir, appId + "/" + execId)
          if (!executorDir.mkdirs()) {
            throw new IOException("Failed to create directory " + executorDir)
          }

          //  为executor创建本地目录
          //  executor在环境变量中获取SPARK_EXECUTOR_DIRS配置
          //  application完成后,worker将其删除
          val appLocalDirs = appDirectories.getOrElse(appId,
            Utils.getOrCreateLocalRootDirs(conf).map { dir =>
              val appDir = Utils.createDirectory(dir, namePrefix = "executor")
              Utils.chmod700(appDir)
              appDir.getAbsolutePath()
            }.toSeq)
          appDirectories(appId) = appLocalDirs
          //  worker将接受到的信息,封装成ExecutorRunner对象
          val manager = new ExecutorRunner(
            appId,
            execId,
            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
            cores_,
            memory_,
            self,
            workerId,
            host,
            webUi.boundPort,
            publicAddress,
            sparkHome,
            executorDir,
            workerUri,
            conf,
            appLocalDirs, ExecutorState.RUNNING)
          executors(appId + "/" + execId) = manager
          //  调用ExecutorRunner的start方法,启动线程
          manager.start()
          coresUsed += cores_
          memoryUsed += memory_
          //  向master发送消息,报告当前executor的状态
          sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
        } catch {
          // 异常处理
          case e: Exception =>
            logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
            if (executors.contains(appId + "/" + execId)) {
              executors(appId + "/" + execId).kill()
              executors -= appId + "/" + execId
            }
            //  向master发送消息,executor状态改变,状态为FAILED
            sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
              Some(e.toString), None))
        }
      }

其中,重点是ExecutorRunner的Start方法:

创建workerThread 线程,该线程运行时执行fetchAndRunExecutor方法

private[worker] def start() {
    //创建线程
    workerThread = new Thread("ExecutorRunner for " + fullId) {
      // 调用fetchAndRunExecutor方法
      override def run() { fetchAndRunExecutor() }
    }
    workerThread.start()

fetchAndRunExecutor方法将接收到的信息后,执行步骤如下:

1.创建ProcessBuilder,用于在本地执行命令或者执行脚本

2.为ProcessBuilder创建执行目录,该目录为executorDir目录,即worker创建的executor工作目录

3.为ProcessBuilder设置环境变量

4.启动ProcessBuilder,生成进程,

这里就会执行命令,执行命令后会产生一个进程CoarseGrainedExecutorBackend,该进程为executor的守护进程

使用jps命令会发现该进程,如下图:


启动这个进程的命令如下:

/opt/jdk1.8/bin/java -Dhdp.version=2.6.0.3-8  \

-cp /usr/hdp/2.6.0.3-8/spark2/conf/:/usr/hdp/2.6.0.3-8/spark2/jars/*:/usr/hdp/current/hadoop-client/conf/   \

-Xmx1024M     \

-Dspark.history.ui.port=18081  \

-Dspark.driver.port=33197  \

org.apache.spark.executor.CoarseGrainedExecutorBackend  \

–driver-url spark://CoarseGrainedScheduler@192.168.1.20:33197  \

–executor-id 0     \

–hostname 192.168.1.20    \

–cores 5    \

–app-id app-20180419170349-0522     \

–worker-url spark://Worker@192.168.1.20:37617

这里的spark运行参数,由substituteVariables将其替换为分配好实际值

执行该命令会调用CoarseGrainedExecutorBackend的main方法,对命令行的args参数进行处理

5.重定向进程输出流文件

6.重定向进程错误流文件

7.等待获取executor进程的退出状态码,等到executor的状态为已退出,向worker发送消息,executor状态改变

 private def fetchAndRunExecutor() {
    try {
      // Launch the process
      //  创建ProcessBuilder,执行Linux启动命令
      //  substituteVariables
      val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
        memory, sparkHome.getAbsolutePath, substituteVariables)
      val command = builder.command()
      val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
      logInfo(s"Launch command: $formattedCommand")

      //  为ProcessBuilder创建执行目录,该目录为executorDir目录,即worker创建的executor工作目录
      builder.directory(executorDir)
      //  为ProcessBuilder设置环境变量
      builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
      builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

      // Add webUI log urls
      val baseUrl =
        if (conf.getBoolean("spark.ui.reverseProxy", false)) {
          s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
        } else {
          s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
        }
      builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
      builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

      //  启动ProcessBuilder,生成进程
      process = builder.start()
      val header = "Spark Executor Command: %s\n%s\n\n".format(
        formattedCommand, "=" * 40)

      // Redirect its stdout and stderr to files
      //  重定向进程输出流文件
      val stdout = new File(executorDir, "stdout")
      stdoutAppender = FileAppender(process.getInputStream, stdout, conf)

      //  重定向进程错误流文件
      val stderr = new File(executorDir, "stderr")
      Files.write(header, stderr, StandardCharsets.UTF_8)
      stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

      //  等待获取executor进程的退出状态码
      val exitCode = process.waitFor()
      //  如果executor的状态为退出
      state = ExecutorState.EXITED
      val message = "Command exited with code " + exitCode
      //  向worker发送executor状态改变
      worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
    } catch {
      case interrupted: InterruptedException =>
        logInfo("Runner thread for executor " + fullId + " interrupted")
        state = ExecutorState.KILLED
        killProcess(None)
      case e: Exception =>
        logError("Error running executor", e)
        state = ExecutorState.FAILED
        killProcess(Some(e.toString))
    }
  }
}

至此,executor的启动算是说完了,剩下的内容转战CoarseGrainedExecutorBackend

下篇Spark源码系列会对CoarseGrainedExecutorBackend进行分析,下篇文章见~

赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

You may also like...

发表评论

邮箱地址不会被公开。