Spark源码阅读:Application的注册与Application的调度算法

  


系列导读

想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~

1.Spark Submit任务提交

2.Driver的注册与启动

3.SparkContext的初始化

4.TaskScheduler的创建和启动

5.Application的注册

6.Master注册机制-Application的注册与Application的调度算法

7.Spark源码阅读:Executor的启动

8.CoarseGrainedExecutorBackend的启动

9.DAGScheduler源码分析

10.DAGScheduler Stage划分与Task最佳位置计算

11.TaskScheduler调度算法与调度池

12.TaskScheduler任务提交与资源分配

13.Task的启动

14.Task的结果处理


继上文Spark源码阅读:Application的注册

StandaloneAppClient向Master发送注册application消息后

masterRef.send(RegisterApplication(appDescription, self))

进入org.apache.spark.deploy.master,查看Master处理Application注册的源码

Master接收到消息后,做出如下操作

1.判断Master状态

2.传入applicationDescription和driver,调用createApplication方法,创建application,得到封装的ApplicationInfo,即app

3.将app传入,注册application,将app加入内存缓存,将app加入等待队列

4.持久化引擎将app持久化

5.driver向StandaloneAppClient发送消息,application已注册,即RegisteredApplication

6.最后调用scheduler方法,安排调度

/**
      * 处理Application注册的请求
      */
    case RegisterApplication(description, driver) =>
      //  防止一些Driver的重复注册
      //  如果master的状态是standby时,注册Application,无视该注册,没有反馈
      if (state == RecoveryState.STANDBY) {
      } else {
        logInfo("Registering app " + description.name)
        //  传入description和driver,去创建Application,得到一个封装好的ApplicationInfo
        val app = createApplication(description, driver)
        //  将封装好的ApplicationInfo传入,注册Application
        //  将app加入内存缓存,将app加入等待队列
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        //  使用持久化引擎将app持久化
        persistenceEngine.addApplication(app)
        //  driver发送消息,向StandaloneSchedulerBackend的StandaloneAppClient发送消息RegisteredApplication,已注册Application
        driver.send(RegisteredApplication(app.id, self))
        //  然后调用schedule方法,去进行调度
        schedule()
        //  至此,Application注册完成
      }

从createApplication开始,该方法根据将当前时间戳,ApplicationDescription,app id,日期,driver,默认cpu core数信息封装成了ApplicationInfo,并返回

 //  创建Application,返回封装好的ApplicationInfo
  private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef):
      ApplicationInfo = {
    //  获取当前系统时间
    val now = System.currentTimeMillis()
    //  根据时间戳生成日期
    val date = new Date(now)
    //  根据实际生成application id
    val appId = newApplicationId(date)
    //  将时间戳,ApplicationDescription,app id,日期,driver,默认cpu core数信息封装成了ApplicationInfo,并返回
    new ApplicationInfo(now, appId, desc, date, driver, defaultCores)
  }

收返回的ApplicationInfo,即app,接下来注册application调用registerApplication

1.首先判断driver地址是否存在,避免重复注册application

2.将app添加到apps HashSet中

3.将app id添加到idToApp HashMap中

4.将app driver添加到endpointToApp HashMap中

5.将app rpc地址添加到addressToApp HashMap中

6.将app添加到等待执行waitingApps ArrayBuffer中

 //  注册application
  private def registerApplication(app: ApplicationInfo): Unit = {
    // 获取driver地址
    val appAddress = app.driver.address
    // 判断driver地址是否存在,避免重复注册
    if (addressToApp.contains(appAddress)) {
      logInfo("Attempted to re-register application at same address: " + appAddress)
      return
    }
    applicationMetricsSystem.registerSource(app.appSource)
    // 将app加入HashSet中
    apps += app
    //  将app id添加到idToApp HashMap中
    idToApp(app.id) = app
    //  将app driver添加到endpointToApp HashMap中
    endpointToApp(app.driver) = app
    //  将app rpc地址添加到addressToApp HashMap中
    addressToApp(appAddress) = app
    //  将app添加到等待执行waitingApps队列中
    waitingApps += app
    if (reverseProxy) {
      webUi.addProxyTargets(app.id, app.desc.appUiUrl)
    }
  }

接下来持久化引擎将app持久化

driver向StandaloneAppClient发送消息,application已注册,即RegisteredApplication

重点为最后调用的schedule( )方法

来看scheduler方法,前文driver注册与启动阅读过相关driver源码,略过driver部分

1.首先判断master的状态

2.调用startExecutorsOnWorkers方法

 /**
    *   在等待的应用程序中分配当前可用的资源
    *   每当一个新的应用程序加入或资源可用性改变时,这个方法将被调用。
    */
  private def schedule(): Unit = {
    //  先来判断,当前master的状态,如果不是Alive,什么都不做
    if (state != RecoveryState.ALIVE) {
      return
    }
   
    //  调用startExecutorsOnWorkers方法
    startExecutorsOnWorkers()
  }

startExecutorsOnWorkers

先对worker资源进行筛选,cpu core数多的worker排在前面,将可用的worker添加到可用的usableWorkers集合中

调用scheduleExecutorsOnWorkers,为Application分配要在Worker上启动的Executor运行的Cpu Core数

分配好了executor 的core数

最后调用allocateWorkerResourceToExecutors,按照分配规则,分配worker资源给execut

 //  在worker上调度和启动executor
  private def startExecutorsOnWorkers(): Unit = {
    //  Spark默认为application启动Executor方式是FIFO调度,所有的application都是放在调度等待队列中

    //  判断app剩余core数是否大于0,如果大于,遍历waitingApps
    for (app <- waitingApps if app.coresLeft > 0) {
      // 每个executor的cpu core
      val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
      //  获取可用的worker,过滤出Alive Worker,且该worker的资源大于app对每个executor所需资源,并根据CPU数进行排序,CPU数空闲多的排在前面
      val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
        .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
          worker.coresFree >= coresPerExecutor.getOrElse(1))
        .sortBy(_.coresFree).reverse
      //  分配core数,给worker分配executor
      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
      //  已经决定了每个Worker分配了多少个cores,开始分配
      for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
          allocateWorkerResourceToExecutors(
          app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
      }
    }
  }

关于scheduleExecutorsOnWorkers,该方法为Application分配要在Worker上启动的Executor返回一个数组,包括每个worker分配到的core数

这里就要说一下重点的Spark Application调度算法了

该调度算法为已注册的Application(但没有完成调度的),调度Worker节点上的Executor

有两种调度算法:

第一种:Spread Out 

将一个Application的Executor Core数尽可能分配到更多的Worker上,甚至分配集群上所有的Worker上,这种方式往往会带来潜在的更好的数据本地性

第二种:非Spread Out 

尝试将worker节点的空余的Cores都用尽了,才会去下一个worker节点分配,第二种方法是默认的,默认情况下,每个Worker只能启动一个Executor

为了更好的理解,举个例子


现有5个Worker,每个节点剩余资源如下,根据Core排序:

剩余Core 剩余内存
Worker1 7 5G
Worker2 6 3G
Worker3 3 4G
Worker4 2 300M
Worker5 1 2G

现有一个用户提交application,该application最多需要12个core,每个executor 内存为500M,每个executor core数为2

在StartExecutorsOnWorkers方法中先对worker进行筛选,Worker4 不满足每个executor内存需求,Worker 5不满足每个executor core需求

将筛选出来的Worker1、Worker2、Worker3添加到usableWorkers中,下面就要对资源进行调度

Spread Out 

第一次调度后,剩余资源如下

剩余Core 剩余内存 分配的Core 分配的内存
Worker1 5 4.5G 2 500M
Worker2 6 3G 0 0
Worker3 3 4G 0 0

如上,可以发现只有Worker 1的可用资源变少了,Worker 2,3没有变化,因为第一次调度时,只在Worker 1上分配了一个executor,这个executor占用2个core,500M内存

接下来去Worker 2上去分配,分配后剩余资源如下

剩余Core 剩余内存 分配的Core 分配的内存
Worker1 5 4.5G 2 500M
Worker2 4 2.5G 2 500M
Worker3 3 4G 0 0

然后去Worker 3上分配

剩余Core 剩余内存 分配的Core 分配的内存
Worker1 5 4.5G 2 500M
Worker2 4 2.5G 2 500M
Worker3 1 3.5G 2 500M

现在已经分配了3个executor,6个core,1.5G内存,但是资源还没有分配完,接着进行下一轮分配

还是先给Worker 1进行分配

剩余Core 剩余内存 分配的Core 分配的内存
Worker1 3 4G 4 1G
Worker2 4 2.5G 2 500M
Worker3 1 3.5G 2 500M

给Worker 2分配

剩余Core 剩余内存 分配的Core 分配的内存
Worker1 3 4G 4 1G
Worker2 2 2G 4 1G
Worker3 1 3.5G 2 500M

给Worker 3进行分配的时候,发现Worker 3的资源已经不够了,所以,又来了一轮,再给Worker 1进行分配

剩余Core 剩余内存 分配的Core 分配的内存
Worker1 1 3.5G 6 1.5M
Worker2 4 2.5G 4 1G
Worker3 1 3.5G 2 500M

现在12个core全部分配,资源分配完毕,共启动了6个executor,worker 1上有3个executor,worker 2上有2个executor,worker 3上有1个executor

非Spread Out 模式

给Worker 1进行分配资源

剩余Core 剩余内存 分配的Core 分配的内存
Worker1 5 4.5G 2 500M
Worker2 6 3G 0 0
Worker3 3 4G 0 0

由于Worker 1上还有资源可以分配,所以继续给Worker 1分配

剩余Core 剩余内存 分配的Core 分配的内存
Worker1 3 4G 4 1G
Worker2 6 3G
Worker3 3 4G

Worker 1上仍然有资源可以分配,继续给Worker 1分配

剩余Core 剩余内存 分配的Core 分配的内存
Worker1 1 3.5G 6 1.5G
Worker2 6 3G
Worker3 3 4G

分配3次之后,Worker 1资源已经不够用了,这里还有6个core没有分配,去Worker 2上进行分配,以此类推,直到分配的资源满足application,停止分配,最后Worker资源如下

剩余Core 剩余内存 分配的Core 分配的内存
Worker1 1 3.5G 6 1.5G
Worker2 0 1.5G 6 1.5G
Worker3 3 4G

栗子举完了,下面来看源码,会更好理解

1.指定每个executor需要的资源

2.通过指定的资源去找符合要求的worker,其中canLaunchExecutor,就是其中找符合要求的过程,即worker的资源是否满足启动executor条件,得到一个满足条件的worker列表

3.再根据调度模式,遍历满足条件的worker列表

如果是Spread Out,给第一个得到的worker启动executor,然后结束分配资源,去下一个Worker继续进行分配executor,然后去下一个worker进行分配,继续去下一个worker分配,直到满足了application的需求

如果是非Spread Out模式,则一直在这个worker上启动executor,直到这个worker资源分配尽,如果还没有满足application的需求,则去下一个worker上一直分配,以此类推,直到满足application的需求

4.最后返回给每个worker分配好的core数组

 private def scheduleExecutorsOnWorkers(
      app: ApplicationInfo,
      usableWorkers: Array[WorkerInfo],
      spreadOutApps: Boolean): Array[Int] = {
    //  每个Executor的core数
    val coresPerExecutor = app.desc.coresPerExecutor
    //  每个Executor最小分配core数
    val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
    //  每个worker一个executor,布尔类型
    val oneExecutorPerWorker = coresPerExecutor.isEmpty
    //  每个Executor的内存
    val memoryPerExecutor = app.desc.memoryPerExecutorMB
    //  numUsable array长度
    val numUsable = usableWorkers.length
    //  Worker上已经分配的core数
    val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
    // Worker上已经分配的executor数量
    val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
    //  app实际会分配的Cpu的数量,取app剩余的还没有被分配的CPU的数量和可以使用的workers的cpu数量总和之间的最小值
    var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)  //  是否可以在Worker上分配Executor
    def canLaunchExecutor(pos: Int): Boolean = {

      //  判断是否能一直调度,需要分配的core大于等于executor最小分配core数
      val keepScheduling = coresToAssign >= minCoresPerExecutor
      //  判断worker是否还有足够的core资源
      val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor

      //  如果允许worker为同一个application启动多个executor,或者在worker上没有启动过这个application的executor,那么就会创建一个executor
      //  如果在这个worker上已经有executor,则给这个executor更多的cpu core
      val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
      if (launchingNewExecutor) {
        //  分配的内存总量 = 分配的Executor数 * 每个executor分配的内存
        val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
        //  判断是否有足够的内存
        //  可用的worker剩余内存 - 分配的内存总量 >= 每个executor分配的内存
        val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
        //  判断是否满足application需求
        val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
        keepScheduling && enoughCores && enoughMemory && underLimit
      } else {
        // oneExecutorPerWorker机制下,不检测内存和executor限制,很重要
        keepScheduling && enoughCores
      }
    }

    //  通过canLaunchExecutor判断是否能启动executor
    // 如果能,则一直启动executor,直到该Worker资源不足,或满足了Application需求
    var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
    //  开始在worker上分配
    while (freeWorkers.nonEmpty) {
      //  遍历freeWorkers
      freeWorkers.foreach { pos =>
        var keepScheduling = true
        //  再次判断是否能启动executor
        while (keepScheduling && canLaunchExecutor(pos)) {
          //  更新已经分配的core数
          coresToAssign -= minCoresPerExecutor
          assignedCores(pos) += minCoresPerExecutor

          //  如果只允许每个worker上启动一个executor,那么worker分配的executor数量为1
          //  如果允许worker启动多个executor,就是在原来的executor上加1
          if (oneExecutorPerWorker) {
            assignedExecutors(pos) = 1
          } else {
            assignedExecutors(pos) += 1
          }

          //  Spreading out Application方式意味着在尽可能多的Worker中分配executor。
          //  如果不使用Spreading out算法,就在一个worker上持续分配executor,直到用完worker的资源
          //  如果使用Spreading Out算法,就去下一个worker上分配executor

          //  如果使用Spreading Out算法,将keepScheduling修改为false
          // 不需要在这个worker上继续进行分配,去下一个worker上分配
          if (spreadOutApps) {
            keepScheduling = false
          }
        }

      }
      //  过滤不满足条件的worker
      freeWorkers = freeWorkers.filter(canLaunchExecutor)
    }
    //  返回每个worker应该被分配的core数
    assignedCores
  }

scheduleExecutorsOnWorkers分配好资源后,返回分配的数组,接下来就是对每个worker按照分配的数组进行分配资源

调用allocateWorkerResourceToExecutors给每个worker进行分配

allocateWorkerResourceToExecutors方法中

1.计算这个worker需要分配executor数量

2.获取worker上每一次为executor需要分配的cores的数量

3.根据需要分配的executor数量,调用app.addExecutor为application添加executor信息

4.调用launchExecutor,启动executor

5.将application的状态修改为Running

  private def allocateWorkerResourceToExecutors(
      app: ApplicationInfo,
      assignedCores: Int,
      coresPerExecutor: Option[Int],
      worker: WorkerInfo): Unit = {

    //  这个worker需要分配executor数量
    val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
    //  这个worker上每一次为executor需要分配的cores的数量
    val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
    for (i <- 1 to numExecutors) {
      //  调用addExecutor方法,为application添加executor
      val exec = app.addExecutor(worker, coresToAssign)
      //  启动executor
      launchExecutor(worker, exec)
      //  将application的状态改为Running
      app.state = ApplicationState.RUNNING
    }
  }

至此,Master处理application的注册和application的调度算法看完了。

下一篇文章,继续阅读launchExecutor,查看exeuctor的启动。

下篇文章见~

赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

发表评论

邮箱地址不会被公开。