Spark源码阅读:Driver的注册与启动


系列导读

想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~

1.Spark Submit任务提交

2.Driver的注册与启动

3.SparkContext的初始化

4.TaskScheduler的创建和启动

5.Application的注册

6.Master注册机制-Application的注册与Application的调度算法

7.Spark源码阅读:Executor的启动

8.CoarseGrainedExecutorBackend的启动

9.DAGScheduler源码分析

10.DAGScheduler Stage划分与Task最佳位置计算

11.TaskScheduler调度算法与调度池

12.TaskScheduler任务提交与资源分配

13.Task的启动

14.Task的结果处理


上一篇文章Spark源码阅读: Spark Submit任务提交,介绍了用户如何使用Spark-submit方式提交任务,ClientEndPoint向Master发送RequestSubmitDriver消息,注册Driver

这篇文章来看Master接受到消息后,Driver的注册与启动


来到org.apache.spark.deploy.master.Master.scala

Master接收到RequestSubmitDriver消息后,做了如下几个操作

1.首先判断Master的状态是否为Alive

2.根据发送来的DriverDescription调用createDriver方法,创建driver,返回封装好的DriverInfo

3.使用持久化引擎persistenceEngine,将DriverInfo持久化

4.将driver添加到等待Driver队列中

5.将driver添加到drivers HashSet中

6.调用schedule方法,进行安排

 //  请求提交Driver
      //  参数 DriverDescription
    case RequestSubmitDriver(description) =>
      // 判断Master状态是否为Alive
      if (state != RecoveryState.ALIVE) {
        //  如果不为Alive,则回应提交Driver失败,需要向Alive的Master提交
        val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
          "Can only accept driver submissions in ALIVE state."
        context.reply(SubmitDriverResponse(self, false, None, msg))
      } else {
        //  打印日志:已提交Driver
        logInfo("Driver submitted " + description.command.mainClass)
        //  使用createDriver方法,创建driver
        val driver = createDriver(description)
        //  使用持久化引擎将DriverInfo持久化
        persistenceEngine.addDriver(driver)
        //  将driver添加到等待的waitingDriver队列中
        waitingDrivers += driver
        //  将driver添加到drivers HashSet中
        drivers.add(driver)
        //  执行scheduler,去进行调度
        schedule()

CreateDriver方法中将系统时间、driver id、driver desc描述信息封装为一个DriverInfo返回

  //  创建Driver,返回封装好的DriverInfo
  private def createDriver(desc: DriverDescription): DriverInfo = {
    val now = System.currentTimeMillis()
    val date = new Date(now)
    //  将系统当前时间,driver id,desc,日期等信息封装成一个DriverInfo返回
    new DriverInfo(now, newDriverId(date), desc, date)
  }

接下来看一下schedule方法

1.判断当前Master的状态

2.对当前ALIVE状态worker进行shuffle打乱(随机洗牌)workers集合内元素,得到打乱后的worker集合

3.遍历waitingDrivers,给每个driver在打乱后的Alive Workers集合中选出一个Worker,如果选出的worker空闲资源需求大于driver所需资源,则调用launchDriver方法,在该worker上启动driver,在drivers等待队列中移除该driver,并标记该Driver为已启动

4.最后调用startExecutorsOnWorkers

  /**
    *   在等待的应用程序中分配当前可用的资源
    *   每当一个新的应用程序加入或资源可用性改变时,这个方法将被调用。
    */
  private def schedule(): Unit = {
    //  先来判断,当前master的状态,如果不是Alive,什么都不做
    if (state != RecoveryState.ALIVE) {
      return
    }
    //  Driver程序比Executor优先
    //  对当前ALIVE状态worker进行随机洗牌(shuffle)打乱workers集合内元素,得到打乱后的worker集合
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    //  获取现在ALIVE的Worker数
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
    //  遍历一个waitingDrivers集合的复制
    //  将worker分配给每个等待的driver
    //  对于每一个driver,从分配给driver的最后一个worker开始,然后继续向前遍历,直到探索到所有的Alive Worker
    for (driver <- waitingDrivers.toList) { 
      var launched = false
      var numWorkersVisited = 0
      //  如果Worker Visited数小于Worker Alive数,并且launched为true
      while (numWorkersVisited < numWorkersAlive && !launched) {
        // 在打乱后的Alive Workers中选出一个Worker
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        //  如果选出的worker空闲资源需求大于driver所需资源,则在该worker上启动driver
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          //  调用launchDriver方法
          launchDriver(worker, driver)
          //  在waitingDrivers等待队列中移除该driver
          waitingDrivers -= driver
          //  将launched修改为true
          launched = true
        }
        curPos = (curPos + 1) % numWorkersAlive
      }
    }
    //  调用startExecutorsOnWorkers方法
    startExecutorsOnWorkers()
  }

接下来看一下,launchDriver方法,方法执行步骤如下:

1.在WorkerInfo中添加driver信息

2.在DriverInfo中添加worker信息

3.调用worker RpcEndPointRef向Worker发送LaunchDriver消息,告诉worker启动driver

4.将driver状态更新为RUNNING

  //  开始driver
  private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
    //  打印日志,在xx worker上启动driver
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    //  在WorkerInfo中添加driver信息
    worker.addDriver(driver)
    //  在DriverInfo中添加worker信息
    driver.worker = Some(worker)
    //  调用worker RpcEndPointRef向Worker发送LaunchDriver消息
    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
    //  driver状态更新为RUNNING
    driver.state = DriverState.RUNNING
  }

至此,Driver注册完毕,前往org.apache.spark.deploy.worker.Worker.scala

Worker接收到LaunchDriver消息后

1.将driver信息封装为DriverRunner对象

2.将创建的DriverRunner对象添加到drivers HashMap中保存

3.调用DriverRunner的start方法

4.记录cores使用情况,和memory使用情况

 case LaunchDriver(driverId, driverDesc) =>
      logInfo(s"Asked to launch driver $driverId")
      //  将driver信息封装为DriverRunner对象
      val driver = new DriverRunner(
        conf,
        driverId,
        workDir,
        sparkHome,
        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
        self,
        workerUri,
        securityMgr)
      //  将创建的DriverRunner对象添加到drivers HashMap中保存
      drivers(driverId) = driver
      //  调用start方法
      driver.start()
      //  记录coresUsed和memoryUsed数据
      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem

查看Driver Runner的start方法,该方法中

1.调用prepareAndRunDriver方法,用于准备driver jar包,并运行driver

2.根据prepareAndRunDriver方法的退出码,设置最终状态

3.根据模式匹配,向worker发送DriverStateChanged信息,通知worker driver状态改变

/**
    *  开启一个线程去运行和管理driver
    */
  private[worker] def start() = {
    //  启动一个线程
    new Thread("DriverRunner for " + driverId) {
      override def run() {
        var shutdownHook: AnyRef = null
        try {
          shutdownHook = ShutdownHookManager.addShutdownHook { () =>
            logInfo(s"Worker shutting down, killing driver $driverId")
            kill()
          }
         
          //  准备driver 依赖jar包并且执行driver,该方法返回一个退出码
          val exitCode = prepareAndRunDriver()

          //  根据exitCode,设置finalState,状态分为完成,被杀死,失败
          finalState = if (exitCode == 0) {
            Some(DriverState.FINISHED)
          } else if (killed) {
            Some(DriverState.KILLED)
          } else {
            Some(DriverState.FAILED)
          }
        } catch {
          case e: Exception =>
            kill()
            finalState = Some(DriverState.ERROR)
            finalException = Some(e)
        } finally {
          if (shutdownHook != null) { 
            ShutdownHookManager.removeShutdownHook(shutdownHook)
          }
        }

        //  向worker发送DriverStateChanged信息通知Driver发生状态改变
        worker.send(DriverStateChanged(driverId, finalState.get, finalException))
      }
    }.start()
  }

prepareAndRunDriver方法执行步骤如下:

1.创建driver工作目录

2.下载用户jar包到该工作目录

3.匹配参数替代变量substituteVariables

4.创建ProcessBuilder,传入参数为启动driver的执行命令,securityManager,启动driver所需内存,spark home的绝对路径,以及替代的变量

5.调用runDriver方法

 private[worker] def prepareAndRunDriver(): Int = {
    //  创建driver 工作目录
    val driverDir = createWorkingDirectory()
    //  下载用户jar包到工作目录
    val localJarFilename = downloadUserJar(driverDir)

    def substituteVariables(argument: String): String = argument match {
      case "{{WORKER_URL}}" => workerUrl
      case "{{USER_JAR}}" => localJarFilename
      case other => other
    }

    //  创建ProcessBuilder
    val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
      driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)

    runDriver(builder, driverDir, driverDesc.supervise)
  }

runDriver方法中,重定向将process的InputStream和ErrorStream重定向到指定的stdout、stderr文件中

这就是我们在Spark application web页面中查看Logs对应的两个日志文件


最后调用runCommandWithRetry方法

该方法中,有一个初始化方法initialize,在初始化方法中做了如下步骤:

1.创建stdout、stderr文件

2.将builder命令格式化处理

3.将进程的Error流信息重定向到stderr文件

在runCommandWithRetry方法中最后调用runCommandWithRetry方法,将

 private def runDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean): Int = {

    builder.directory(baseDir)
    //  初始化
    def initialize(process: Process): Unit = {
      // Redirect stdout and stderr to files
      //  创建stdout文件
      val stdout = new File(baseDir, "stdout")
      //  将process的InputStream流重定向到stdout文件
      CommandUtils.redirectStream(process.getInputStream, stdout)
      //  创建stderr文件
      val stderr = new File(baseDir, "stderr")
      //  将builder命令格式化处理
      val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
      val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
      Files.append(header, stderr, StandardCharsets.UTF_8)
      //  将process的Error流信息重定向到stderr文件
      CommandUtils.redirectStream(process.getErrorStream, stderr)
    }
    //  调用runCommandWithRetry
    runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
  }

runCommandWithRetry是实际在系统中执行命令的操作,该操作返回process的exitCode退出码

private[worker] def runCommandWithRetry(
      command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Int = {
    //  默认exitCode为-1
    var exitCode = -1
    // Time to wait between submission retries.
    // 在提交重试时等待时间
    var waitSeconds = 1
    // A run of this many seconds resets the exponential back-off.
    val successfulRunDuration = 5
    var keepTrying = !killed

    while (keepTrying) {
      logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))

      synchronized {
        //  如果被kill,返回exitCode
        if (killed) { return exitCode }
        //  执行命令command,启动
        process = Some(command.start())
        //  调用初始化方法
        initialize(process.get)
      }

      val processStart = clock.getTimeMillis()

      //  获取exitCode
      exitCode = process.get.waitFor()

      // check if attempting another run

      //  检查是否尝试另一次运行
      keepTrying = supervise && exitCode != 0 && !killed
      if (keepTrying) {
        if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
          waitSeconds = 1
        }
        logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
        sleeper.sleep(waitSeconds)
        waitSeconds = waitSeconds * 2 // exponential back-off
      }
    }
    //  返回exitCode
    exitCode
  }
}

在系统中实际执行的命令如下(若有不同,根据实际为准):

这才是真正的提交操作,启动Driver

/opt/jdk1.8/bin/java -Dhdp.version=2.6.0.3-8 -cp /usr/hdp/current/spark2-historyserver/conf/:/usr/hdp/2.6.0.3-8/spark2/jars/*:/usr/hdp/current/hadoop-client/conf/ 
-Xmx1024M 
-Dspark.history.kerberos.keytab=none 
-Dspark.eventLog.enabled=true 
-Dspark.yarn.historyServer.address=192.168.1.20:18081 
-Dspark.jars=file:/usr/hdp/2.6.0.3-8/spark2/bin/../examples/jars/spark-examples_2.11-2.1.0.2.6.0.3-8.jar 
-Dspark.submit.deployMode=cluster 
-Dspark.history.ui.port=18081 
-Dspark.executor.memory=2G
-Dspark.driver.extraLibraryPath=/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64 
-Dspark.yarn.queue=default 
-Dspark.executor.extraLibraryPath=/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64 
-Dspark.history.fs.logDirectory=hdfs:///spark2-history/ 
-Dspark.driver.supervise=false 
-Dspark.app.name=org.apache.spark.examples.SparkPi
-Dspark.history.kerberos.principal=none 
-Dspark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider 
-Dspark.cores.max=5 
-Dspark.rpc.askTimeout=10s 
-Dspark.master=spark://192.168.1.20:7077 
-Dspark.eventLog.dir=hdfs:///spark2-history/
org.apache.spark.deploy.worker.DriverWrapper
spark://Worker@192.168.1.20:40612  /usr/hdp/2.6.0.3-8/spark2/work/driver-20180423114006-0009/spark-examples_2.11-2.1.0.2.6.0.3-8.jar 
org.apache.spark.examples.SparkPi 
1000


移除Driver操作:

DriverRunner根据退出码进行匹配,通知worker driver状态发生改变

worker接受到该消息后,做出如下操作:

1.向Master发送消息driverStateChanged

2.在drivers集合中移除当前driver

3.将当前driver添加到已完成driver集合中

4.移除该driver分配的内存、cpu cores资源

 private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
    // driver id
    val driverId = driverStateChanged.driverId
    val exception = driverStateChanged.exception
    val state = driverStateChanged.state
    //  根据state进行匹配,并打印日志
    state match {
      case DriverState.ERROR =>
        logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
      case DriverState.FAILED =>
        logWarning(s"Driver $driverId exited with failure")
      case DriverState.FINISHED =>
        logInfo(s"Driver $driverId exited successfully")
      case DriverState.KILLED =>
        logInfo(s"Driver $driverId was killed by user")
      case _ =>
        logDebug(s"Driver $driverId changed state to $state")
    }
    //  向master发送消息driverStateChanged
    sendToMaster(driverStateChanged)
    //  在drivers集合中移除当前driver
    val driver = drivers.remove(driverId).get
    //  将当前driver添加到finishedDrivers
    finishedDrivers(driverId) = driver
    trimFinishedDriversIfNecessary()
    //  移除driver内存、cpu cores资源
    memoryUsed -= driver.driverDesc.mem
    coresUsed -= driver.driverDesc.cores
  }

Master在接受到消息后,根据匹配做出操作,如果为ERROR FINISHED KILLED FAILED状态,则调用removeDriver移除Driver

ase DriverStateChanged(driverId, state, exception) =>
      state match {
        case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
          removeDriver(driverId, state, exception)
        case _ =>
          throw new Exception(s"Received unexpected state update for driver $driverId: $state")
      }

removeDriver方法中,做出一系列移除操作

1.在drivers集合中移除driver

2.将driver添加到completedDrivers集合中

3.持久化引擎移除driver

4.更新driver状态为最终状态时的状态

5.移除worker上的driver

6.调用schedule方法,重新调度资源

  private def removeDriver(
      driverId: String,
      finalState: DriverState,
      exception: Option[Exception]) {
    //  根据该driver id进行模式匹配
    drivers.find(d => d.id == driverId) match {
      case Some(driver) =>
        logInfo(s"Removing driver: $driverId")
        //  在drivers集合中移除driver
        drivers -= driver
        if (completedDrivers.size >= RETAINED_DRIVERS) {
          val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
          completedDrivers.trimStart(toRemove)
        }
        // 将driver添加到completedDrivers集合中
        completedDrivers += driver
        //  持久化引擎移除driver
        persistenceEngine.removeDriver(driver)
        //  更新driver状态为finalState时
        driver.state = finalState
        driver.exception = exception
        //  移除worker上的driver
        driver.worker.foreach(w => w.removeDriver(driver))
        //  调用schedule方法,做移除操作
        schedule()
      case None =>
        logWarning(s"Asked to remove unknown driver: $driverId")
    }
  }

至此,整个Driver的注册和启动过程完成了。

我们下篇文章见~

亲,看完了点个赞呗!

赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

You may also like...

4 Responses

  1. 唐衡说道:

    您好,spark难学习吗?想学习一下,但是没有方法和思路

  2. 匿名说道:

    您好,想请教下,runCommandWithRetry这个方法用什么保证不死循环?

  3. 匿名说道:

    意思是可以有多个driver?

发表评论

邮箱地址不会被公开。