Spark源码阅读:Application的注册



系列导读

想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~

1.Spark Submit任务提交

2.Driver的注册与启动

3.SparkContext的初始化

4.TaskScheduler的创建和启动

5.Application的注册

6.Master注册机制-Application的注册与Application的调度算法

7.Spark源码阅读:Executor的启动

8.CoarseGrainedExecutorBackend的启动

9.DAGScheduler源码分析

10.DAGScheduler Stage划分与Task最佳位置计算

11.TaskScheduler调度算法与调度池

12.TaskScheduler任务提交与资源分配

13.Task的启动

14.Task的结果处理


上一篇文章介绍了TaskScheduler的初始化,这篇文章继续来看StandaloneSchedulerBackend中Spark Application的注册与提交

StandaloneSchedulerBackend作用于Driver内,用于Standalone模式下资源管理器、及Executor通信

StandaloneSchedulerBackend的Start方法中,除了调用父类CoarseGrainedSchedulerBackend的start方法创建DriverEndPoint以外

主要是创建AppClient,AppClient会向Master注册Application,Master通过Application信息为它分配Worker

在创建AppClient之前,还有一些准备过程,获取一些必要参数

 //  获取Driver RpcEndpoint地址
    val driverUrl = RpcEndpointAddress(
      sc.conf.get("spark.driver.host"),
      sc.conf.get("spark.driver.port").toInt,
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString

    //  启动参数
    val args = Seq(
      "--driver-url", driverUrl,
      "--executor-id", "{{EXECUTOR_ID}}",
      "--hostname", "{{HOSTNAME}}",
      "--cores", "{{CORES}}",
      "--app-id", "{{APP_ID}}",
      "--worker-url", "{{WORKER_URL}}")
    //  executor额外java选项
    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
      .map(Utils.splitCommandString).getOrElse(Seq.empty)
    //  executor额外环境变量
    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    //  executor额外依赖路径
    val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
 //  spark java 参数
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    //  整合java 参数
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
 //  将获取的信息封装为Command
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
    //  获取application UI地址
    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
    //  每个executor core数
    val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)

将上面准备参数和sparkContext相关信息,封装为ApplicationDescription对象

传入ApplicationDescription,创建Standalone AppClient,在创建StandaloneAppClient对象时,会进行StandaloneAppClient的初始化,下面会有讲解

    // 创建ApplicationDescription
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
    //  创建Standalone AppClient,并发送ApplicationDescription,AppClient初始化
    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    //  创建AppClient后,调用AppClient start方法,向listener返回消息
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
    //  等待注册状态更新 waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)
  }

发送的ApplicationDescription信息格式如下:

private[spark] case class ApplicationDescription(
    name: String,    //application名字,可以通过spark.app.name进行设置,也可以通过代码中setAppName()进行设置
    maxCores: Option[Int],  //最多可以使用Cpu Cores数量
    memoryPerExecutorMB: Int, //每个Executor使用的内存
    command: Command,   //启动命令
    appUiUrl: String, //application的 UI Url地址
    eventLogDir: Option[URI] = None, //event日志目录
    //  编写事件日志时使用的压缩编解码器的简称,如果有配置的话(例如lzf)
    eventLogCodec: Option[String] = None,
    coresPerExecutor: Option[Int] = None,
    // 这个Application想要启动的executor数,只有在动态分配启用时使用
    initialExecutorLimit: Option[Int] = None,
    user: String = System.getProperty("user.name", "<unknown>")) {

  override def toString: String = "ApplicationDescription(" + name + ")"
}

创建AppClient,传入参数rpcEnv,masters,appDesc, this, conf,tihs

这里就要说一下这个this,对应的参数类型为listener

StandaloneAppClientListener是一个Trait,为了确保AppClient在各种时间发生时,及时通知StandaloneSchedulerBackend某些状态的更新

在StandaloneSchedulerBackend创建了AppClient时,需要一个listener,会将自身作为一个参数传递给AppClient,就是这个this。

 /**
  *   当各种事件发生时,能够及时通知StandaloneSchedulerBackend的回调
  *   目前有四个事件回调函数
  *   1.成功连接 2.断开连接  3.添加一个executor  4.移除一个executor
  */
private[spark] trait StandaloneAppClientListener {
  //  向master成功注册application,成功连接到集群
  def connected(appId: String): Unit

  //  断开连接
  // 断开连接可能是临时的,可能是原master故障,等待master切换后,会恢复连接
  def disconnected(): Unit

  //  application由于不可恢复的错误停止
  def dead(reason: String): Unit

  //  添加一个executor
  def executorAdded(
      fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
  //  移除一个executor
  def executorRemoved(
      fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit
}

在创建StandaloneAppClient时,需要执行OnStart初始化方法,初始化中调用registerWithMaster,向master注册

override def onStart(): Unit = {
      try { //向master注册
        registerWithMaster(1)
      } catch {
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          stop()
      }
    }

registerWithMaster方法,实际上是调用了tryRegisterAllMasters,向所有的Master进行注册,在spark中,Master可能是高可靠的(HA),会存在多个Master,所以这里向所有的Master进行注册,但最终只有Active Master响应

/**   向所有的master进行异步注册,将会一直调用registerWithMaster进行注册,直到超过注册时间
      *   一旦成功连接到master,所有调度的工作和Futures都会被取消
      */
    private def registerWithMaster(nthRetry: Int) {
      // 实际上调用了tryRegisterAllMasters,向所有的master进行注册
      registerMasterFutures.set(tryRegisterAllMasters())
      registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
        override def run(): Unit = {
          if (registered.get) {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerMasterThreadPool.shutdownNow()
          } else if (nthRetry >= REGISTRATION_RETRIES) {
            markDead("All masters are unresponsive! Giving up.")
          } else {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerWithMaster(nthRetry + 1)
          }
        }
      }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
    }

tryRegisterAllMasters方法,向所有master注册

/**
      *   异步的向master注册,返回一个[Future]数组用来以后取消
      */
    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      for (masterAddress <- masterRpcAddresses) yield {
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = try {
            if (registered.get) {
              return
            }
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
            // 向master注册application
            masterRef.send(RegisterApplication(appDescription, self))
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        })
      }
    }

向Master发出注册application请求后,接收到传过来的ApplicationDescription,将其封装为ApplicationInfo,调用registerApplication注册application

注册成后向StandaloneSchedulerBackend的AppClient发送消息,Application注册成功,listener接受到消息后,状态改变为connected

 //  driver向StandaloneSchedulerBackend的StandaloneAppClient发送消息RegisteredApplication,已注册Application

        driver.send(RegisteredApplication(app.id, self))

最后调用schedule方法进行调度,该方法内部先调用launchDriver向Worker注册并启动driver,然后allocateWorkerResourceToExecutors调用为executor分配资源,接着调用startExecutorsOnWorkers在worker上启动executor,最后再去executor上执行Task。

如果有对application注册等过程不了解的小伙伴们,可以查看之前的文章进行了解:

Spark源码阅读:Master注册机制原理-Application注册

Spark源码阅读:Master注册机制原理-Worker注册与Driver注册

Spark源码阅读:Driver的注册与启动

赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

You may also like...

发表评论

邮箱地址不会被公开。