Spark源码阅读:TaskScheduler任务提交与资源分配


系列导读

想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~

1.Spark Submit任务提交

2.Driver的注册与启动

3.SparkContext的初始化

4.TaskScheduler的创建和启动

5.Application的注册

6.Master注册机制-Application的注册与Application的调度算法

7.Spark源码阅读:Executor的启动

8.CoarseGrainedExecutorBackend的启动

9.DAGScheduler源码分析

10.DAGScheduler Stage划分与Task最佳位置计算

11.TaskScheduler调度算法与调度池

12.TaskScheduler任务提交与资源分配

13.Task的启动

14.Task的结果处理


上一篇文件介绍了TaskSchedulerImpl的调度算法和调度池

TaskSchedulerImpl按照调度算法将资源分配给Task,TaskSchedulerImpl任务的调度基于调度池

了解相关内容,请戳:Spark源码阅读:TaskScheduler调度算法与调度池


这篇文章继续对TaskSchedulerImpl的中Task提交进行阅读

提交Task

在DAGScheduer对Stage划分,并对Task最佳位置进行了计算后(了解相关内容,请访问:Spark源码阅读:DAGScheduler Stage划分与Task最佳位置计算

在DAGScheduer将Task提交给TaskScheduler之前,将多个Task打包为TaskSet后(TaskSet是整个调度中对Task进行调度管理的基本单位),调用taskSchedulersubmitTasks方法

实则是调用了TaskSchedulerImplsubmitTasks方法

查看submitTasks方法,做了如下事情:

1.获取TaskSet中的task数组

2.(重点)为每个taskSet创建了一个TaskSetManager(任务集合管理器),用于管理TaskSet,包括任务推断,Task本地性,并对Task的资源进行分配

3.向调度池中添加刚才创建TaskSetManager

4.判断应用程序是否为local模式并且没有接收到Task

如果是创建一个定时器,通过指定时间检查TaskSchedulerImpl的饥饿状况

如果接收到Task,则取消这个定时器

5.同步块中最后判断TaskSchedulerImpl是否接收到Task

如果接收到Task,则调用backend.reviveOffers()方法,为Task分配资源

  override def submitTasks(taskSet: TaskSet) {
    //  获取taskSet中task
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      //  为每个taskSet创建了一个TaskSetManager
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      //  获取stage id
      val stage = taskSet.stageId
      //  由于TaskSetManager不是线程安全的,所有对它的访问都应该同步,所以添加了synchronized修饰
      //  并创建一个HashMap,用于存储stage对应的TaskSetManager
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      //  将TaskSetManager加入刚才创建的HashMap中,进行缓存
      stageTaskSets(taskSet.stageAttemptId) = manager

      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
      //  向调度池中添加刚才创建的TaskSetManager
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      //  判断应用程序是否为local模式并且TaskSchedulerImpl没有接收到Task
      if (!isLocal && !hasReceivedTask) {
        //  创建一个定时器,通过指定时间检查TaskSchedulerImpl的饥饿状况
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            //  如果TaskSchedulerImpl接收到Task,则取消这个定时器
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      //  判断是否接受到Task
      hasReceivedTask = true
    }
    //  给Task分配资源
    backend.reviveOffers()
  }

资源分配

接下来,调用backend的reviveOffers方法为Task分配资源

我们的提交模式为Standalone,所以当前的backend为StandaloneSchedulerBackend

StandaloneSchedulerBackend的reviveOffers方法是继承其父类的CoarseGrainedSchedulerBackend的reviveOffers方法

来看CoarseGrainedSchedulerBackend的reviveOffers方法:向driver Endpoint发送消息ReviveOffers,driver Endpoint实则就是CoarseGrainedSchedulerBackend在启动时创建的driver Endpoint

 override def reviveOffers() {
    //  向driver Endpoint发送消息ReviveOffers
    driverEndpoint.send(ReviveOffers)
  }

在CoarseGrainedSchedulerBackend driver Endpoint接受到发送来的ReviveOffers消息后,进行模式匹配,调用makeOffers方法

case ReviveOffers =>
        makeOffers()

makeOffers方法向所有可用executor分配虚假资源

该方法内容如下:

1.筛选掉死去的executors

2.将这个application所有可用的executor,将其封装为workOffers,每个workOffers代表了每个executor可用的cpu资源数量

3.调用scheduler的resourceOffers方法,将workOffers传入,实际调用的是TaskSchedulerImpl的resourceOffers方法

4.获取scheduler.resourceOffers的返回值,调用launchTasks方法,启动task

 //  向所有的executor提供虚假资源
    private def makeOffers() {
      //  筛选掉死掉的executor
      val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
     //  将这个application所有可用的executor,将其封装成了workOffers,每个workOffers代表了每个executor可用的cpu资源数量
      val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toIndexedSeq
      // 调用scheduler.resourceOffers的方法,将workOffers传入
      //  获取scheduler.resourceOffers的返回值,调用launchTasks方法
      launchTasks(scheduler.resourceOffers(workOffers))
    }

在启动task之前还有重要的一步,就是TaskSchedulerImpl的resourceOffers方法

了解这个方法前,先来说一下Task的本地性

Task本地性

Spark现在支持五种本地化级别,级别从高到低顺序为:PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY

PROCESS_LOCAL进程本地化:task要计算的数据在同一个Executor中,即同一个JVM中

NODE_LOCAL 节点本地化:速度比 PROCESS_LOCAL 稍慢,因为数据需要在不同进程之间传递或从文件中读取

NO_PREF:没有偏好

RACK_LOCAL 机架本地化:数据在同一机架的不同节点上。需要通过网络传输数据及文件 IO,比NODE_LOCAL慢

ANY:跨机架,数据在非同一机架的网络上,速度最慢

Task的本地性是如何进行运算的呢?

在TaskSetManger中有一个变量myLocalityLevels,即TaskSet的本地化级别,通过调用TaskSetManger的computeValidLocalityLevels方法得到

感兴趣的小伙伴,请自行阅读


来看resourceOffers方法,该方法负责进行资源分配,步骤如下:

1.遍历WorkerOffer

(1)更新host与executor的映射关系

(2)标记添加了新的executor

(3)更新host与机架之间的关系

2.可用的executor进行shuffle分散,避免将task放在同一个worker上,进行负载均衡

3.根据每个WorkerOffer的可用的cpu核数创建同等尺寸的TaskDescription数组

4.将每个WorkerOffer的可用的cpu核数统计到availableCpus数组中

5.按照调度算法排序从调度池中获取排序的taskSet队列

6.重点:taskSet任务分配算法

算法中,执行步骤如下:

(1) 遍历taskSet,从最快的本地化级别开始,调用resourceOfferSingleTaskSet方法,给每个Task Set中Task进行分配资源

看一下resourceOfferSingleTaskSet方法:

遍历WorkerOffer,如果当前executor的cpu数大于每个task所使用的cpu数量,则选择在该executor上启动task,调用taskSet的resourceOffer方法,在executor上,使用这次本地化级别,查看那些task可用启动

即对当前taskSet,尝试使用最快的本地化级别PROCESS_LOCAL,给task分配资源,让task在executor上启动,如果启动异常,跳出循环,换下一种本地化级别尝试直到taskSet在某个本地化级别下,task在executor上启动

private def resourceOfferSingleTaskSet(
      taskSet: TaskSetManager,
      maxLocality: TaskLocality,
      shuffledOffers: Seq[WorkerOffer],
      availableCpus: Array[Int],
      tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
    //  是否启动task
    var launchedTask = false

    //遍历所有WorkerOffer
    for (i <- 0 until shuffledOffers.size) {
      //  获取executorId
      val execId = shuffledOffers(i).executorId
      //  获取host
      val host = shuffledOffers(i).host

      // 如果当前executor可用的CPU大于每个task使用的cpu数量(默认为1)
      if (availableCpus(i) >= CPUS_PER_TASK) {
        try {
          //  调用taskSet的resourceOffer方法,在executor上,使用这次本地化级别,查看那些task可用启动
          //  遍历TaskSet中Task,使用当前本地化级别启动task,在executor上启动task
          for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
            // 放入二维数组,给指定的executor添加符合条件的task
            tasks(i) += task

            //  将相应的信息cache
            val tid = task.taskId
            taskIdToTaskSetManager(tid) = taskSet
            taskIdToExecutorId(tid) = execId
            executorIdToRunningTaskIds(execId).add(tid)
            availableCpus(i) -= CPUS_PER_TASK
            assert(availableCpus(i) >= 0)
            //  标识为true
            launchedTask = true
          }
        } catch {
          case e: TaskNotSerializableException =>
            logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
            return launchedTask
        }
      }
    }
    return launchedTask
  }

(2)如果在所有TaskSet所允许的本地级别下,TaskSet中没有任何一个Task成功启动,调用taskSet的abortIfCompletelyBlacklisted方法,将其添加到黑名单,放弃该task

(3)返回已经获得资源的task列表

taskSet任务分配算法代码如下:

  def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    var newExecAvail = false
    //  遍历WorkerOffer
    for (o <- offers) {
      //  判断HashSet中是否存在executor的Host
      if (!hostToExecutors.contains(o.host)) {
        //  如果不存在,则添加
        hostToExecutors(o.host) = new HashSet[String]()
      }

      //   判断正在运行的Task对应的executor id HashMap集合中是否存在executor id
      if (!executorIdToRunningTaskIds.contains(o.executorId)) {
        //  如果不存在则添加executor等信息
        hostToExecutors(o.host) += o.executorId
        executorAdded(o.executorId, o.host)
        executorIdToHost(o.executorId) = o.host
        executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
        newExecAvail = true
      }
      for (rack <- getRackForHost(o.host)) {
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }

    // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
    //  首先将可用的executor进行shuffle分散,避免将task放在同一个worker上,进行负载均衡
    val shuffledOffers = Random.shuffle(offers)
    // Build a list of tasks to assign to each worker.
    // 根据每个WorkerOffer的可用的cpu核数创建同等尺寸的TaskDescription数组
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    //  将每个WorkerOffer的可用的cpu核数统计到availableCpus数组中
    val availableCpus = shuffledOffers.map(o => o.cores).toArray

    //  创建完TaskSchedulerImpl和SchedulerBackend后,对TaskSchedulerImpl调用方法initialize进行初始化
    //  创建Pool调度池,Pool中缓存了调度队列、调度算法及TaskSetManager集合等信息
    //  这里的rootPool就是之前创建好的调度池,在执行task分配时,要从调度池中获取排序的taskSet队列
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) {
        taskSet.executorAdded()
      }
    }
 
    //  taskSet任务分配算法,双重for循环,遍历所有的taskSet,以及myLocalityLevels本地化级别
    //  遍历taskSet,从最快的本地化级别开始,调用resourceOfferSingleTaskSet方法
    for (taskSet <- sortedTaskSets) {
      var launchedAnyTask = false
      var launchedTaskAtCurrentMaxLocality = false
      for (currentMaxLocality <- taskSet.myLocalityLevels) {
        do {
          //  对当前taskSet,尝试优先使用最快的本地化级别,为task分配资源,在executor上进行启动task
          //  如果启动异常,跳出循环,换下一种本地化级别
          //  尝试直到taskSet在某个本地化级别下,在executor上成功启动task
          launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
            taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
          launchedAnyTask |= launchedTaskAtCurrentMaxLocality
        } while (launchedTaskAtCurrentMaxLocality)
      }
      if (!launchedAnyTask) {
        taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
      }
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    //  返回已经获得资源的task列表
    return tasks
  }

至此,整个Task的提交、分配过程完成。

VV花了一张图,总结了整个Task提交流程,如下图:

下一篇文章,会对最后的lacunchTask启动Task部分进行阅读,我们下篇文章见~


亲,看完了,点个赞呗

赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

You may also like...

发表评论

邮箱地址不会被公开。