Spark源码阅读:DAGScheduler Stage划分与Task最佳位置计算


系列导读

想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~

1.Spark Submit任务提交

2.Driver的注册与启动

3.SparkContext的初始化

4.TaskScheduler的创建和启动

5.Application的注册

6.Master注册机制-Application的注册与Application的调度算法

7.Spark源码阅读:Executor的启动

8.CoarseGrainedExecutorBackend的启动

9.DAGScheduler源码分析

10.DAGScheduler Stage划分与Task最佳位置计算

11.TaskScheduler调度算法与调度池

12.TaskScheduler任务提交与资源分配

13.Task的启动

14.Task的结果处理


前一篇文章阅读了关于DAG相关的源码,具体请查看文章:Spark源码阅读:DAGScheduler源码分析

本篇继续对DAGScheduler中Stage的划分部分源码,结合例子进行,以及对整个DAGScheduler进行总结

开始看源码之前,这里有张宽依赖和窄依赖的图

窄依赖:指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区,和两个父RDD的分区对应于一个子RDD 的分区。图中,map/filter和union属于第一类,对输入进行协同划分(co-partitioned)的join属于第二类。

宽依赖:指子RDD的分区依赖于父RDD的所有分区,这是因为shuffle操作,如图中的groupByKey和未经协同划分的join。 

划分Stage:以shuffle和result这两种类型来划分。

在Spark中有两类task,一类是shuffleMapTask,一类是resultTask

第一类task的输出是shuffle所需数据,第二类task的输出是result,stage的划分也以此为依据。shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage。比如 rdd.parallize(1 to 10).foreach(println) 这个操作没有shuffle,直接就输出了,那么只有它的task是resultTask,stage也只有一个;如果是rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println), 这个job因为有reduceByKey,是一个shuffle过程,那么reduceByKey之前的是一个stage,执行shuffleMapTask,输出shuffle所需的数据,reduceByKey到最后是一个stage,直接就输出结果了。

同理,如果job中有多次shuffle,那么每个shuffle之前都是一个stage。

Stage划分算法有如下几个图步骤:

具体Stage划分算法,这里不再概述,请查看文章:Spark源码阅读:DAGScheduler源码分析



以下图为例,我们过一遍整个Stage的划分


触发Job的finalRDD,即图中的final RDD为RDD G

首先通过getMissingParentStages方法判断RDD G是否含有父Stage

getMissingParentStages方法中,先创建了一个存放RDD的栈

接着往栈中,推入了RDD  G

val waitingForVisit = new Stack[RDD[_]]

 //首先往栈中,推入了stage最后的一个rdd  finalRDD
    waitingForVisit.push(stage.rdd)

进行while循环,由于上面推入了RDD G,栈不为空,那么调用内部visit方法

/进行while循环
    while (waitingForVisit.nonEmpty) {
      // 对stage的最后一个rdd,调用自己内部定义的visit()方法
      visit(waitingForVisit.pop())
    }

判断rdd是否被访问过,若符合条件,先将该rdd添加到已访问,继续往下执行

if (!visited(rdd)) {
        visited += rdd
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)

接下来判断RDD G的依赖是宽依赖还是窄依赖

图中可见,RDD B与RDD G是窄依赖,将RDD B放入栈中,RDD F与RDD G是宽依赖

于是调用getOrCreateShuffleMapStage方法创建一个了stage,RDD G与RDD F被划分成了两个Stage,将RDD G所在Stage划分成了Stage 3,将RDD F所在的Stage划分成了Stage 2,返回一个list[Stage]

继续进行判断,从RDD B开始,RDD B往前找它的父RDD,父RDD A与RDD B是宽依赖的关系,所以,RDD A与RDD B中间也创建了一个Stage,RDD A与RDD B被划分成了两个Stage,RDD A被划分成Stage 1

RDD F与它的父RDD D/E都是窄依赖关系,不产生新的Stage,RDD D与RDD C也是窄依赖关系,也不产生新的Stage,所以RDD C、D、E、F都在Stage 2中。

到这里,整个的Stage划分完成了,返回划分好的stage list

if (rddHasUncachedPartitions) {
          for (dep <- rdd.dependencies) {
            dep match {
                //如果是宽依赖
              case shufDep: ShuffleDependency[_, _, _] =>
                // 那么使用宽依赖的那个RDD,使用getOrCreateShuffleMapStage()方法去创建一个stage
                // 默认最后一个stage,不是shuffleMap stage
                // 但是finalStage之前所有的stage,都是shuffleMap Stage
                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
                //如果是窄依赖,将rdd放入栈中
              case narrowDep: NarrowDependency[_] =>
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }

我们来整理一下整个Stage的划分思路:

从后往前推,遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个RDD加入该stage中。


通过上面的Stage划分,我们不断迭代,提交Stage,获取最开始的Stage,即Stage 1

接下来要对当前传入的Stage 1进行判断,判断该Stage有没有父Stage或父Stage是否执行完毕

Stage 1没有父Stage,调用submitMissingTasks方法,用于提交Stage 1中的Task

submitMissingTasks方法中做了如下事情:

1.首先获取stage中没有计算的partition

2.获取task对应的partition的最佳位置(Task最佳位置计算

3.获取taskBinary,将Stage的RDD和ShuffleDependency(或func)广播到Executor

4.为stage创建task


为了保证计算数据的本地性(数据在哪个节点上,就到哪个节点上的Executor上去运行),DAGScheduler中提供了寻找task对应的partition最佳位置的方法

我们看一下task的最佳位置计算是怎么实现的,事实上调用了getPreferredLocs的getPreferredLocsInternal方法

1.判断rdd的partition是否被访问过,如果被访问过,则什么都不做

2.然后判断DAGScheduler的内存中是否cache了在当前Paritition的信息,如果有的话直接返回

3.如果没有cache,则调用rdd.getPreferedLocations方法,获取RDD partition的最佳位置

 final def preferredLocations(split: Partition): Seq[String] = {
      checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
        getPreferredLocations(split)
      }
  }

4.遍历RDD的依赖,如果有窄依赖,遍历父依赖的partition,对遍历到的每个partition,递归调用getPreferredLocsInternal方法

即,从第一个窄依赖的第一个partition开始,然后将每个partition的最佳位置,添加到序列中,最后返回所有partition的最佳位置序列

如果该stage中rdd,从最后一个rdd,到最开始的rdd的partition都没有被cache或者checkpoint,那么,task的最佳位置为Nil,即没有最佳位置

 private def getPreferredLocsInternal( rdd: RDD[_],
      partition: Int,
      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {

    if (!visited.add((rdd, partition))) {
      // Nil has already been returned for previously visited partitions.
      return Nil
    }

    // 查看当前rdd的partition是否被DAGSchduler缓存了
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    }

    // 调用rdd的preferredLocations方法,获取rdd partition的最佳位置
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {
      return rddPrefs.map(TaskLocation(_))
    }


    //  遍历RDD的依赖,如果有窄依赖,获取其父依赖的partition
    //  递归调用getPreferredLocsInternal方法,从第一个窄依赖的第一个partition开始找到task最佳位置,然后返回task最佳位置序列
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) {
          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
          if (locs != Nil) {
            return locs
          }
        }

        //如果这个stage,从最后一个rdd,到最开始的rdd,partition都没有被缓存或者checkpoint
        //  那么,task的最佳位置为Nil,就是没有
      case _ =>
    }
    Nil
  }

DAGScheduler计算数据本地性的时候巧妙的借助了RDD自身的getPreferedLocations中的数据,最大化的优化的效率,因为getPreferedLocations中表明了每个Partition的数据本地性,虽然当前Partition可能被persist或者checkpoint,但是persist或者checkpoint默认情况下肯定是和getPreferedLocations中的Partition的数据本地性是一致的,所以这就极大的简化Task数据本地性算法的实现和效率的优化


获取到task的最佳位置后,根据stage的类型进行匹配,为每个partition创建一个task

将之前准备好的task最佳位置,taskBinary等其他信息作为参数传入,创建Task

如果是ShuffleMapStage,创建ShuffleMapTask,添加到tasks中

 case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = stage.rdd.partitions(id)
            // 创建ShuffleMapTask
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId)
          }

如果是ResultStage,那么创建ResultTask,添加到tasks中

  case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = stage.rdd.partitions(p)
            val locs = taskIdToLocations(id)
            //  创建ResultTask
            new ResultTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
          }

如果tasks序列中添加了创建好的task,即tasks.size大于0,则为stage中的task,创建TaskSet对象,调用taskScheduler的submitTasks方法,提交TaskSet给TaskScheduler

如果tasks.size小于等于0,将当前stage标记为完成,然后调用submitWaitingChildStages方法,提交当前stage的子stage

if (tasks.size > 0) {
      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      stage.pendingPartitions ++= tasks.map(_.partitionId)
      logDebug("New pending partitions: " + stage.pendingPartitions)
      //  最后,针对stage的task,创建TaskSet对象,调用taskScheduler的submitTasks方法,提交TaskSet
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    } else {
      //   将stage标记为已完成
      markStageAsFinished(stage, None)

      val debugString = stage match {
        case stage: ShuffleMapStage =>
          s"Stage ${stage} is actually done; " +
            s"(available: ${stage.isAvailable}," +
            s"available outputs: ${stage.numAvailableOutputs}," +
            s"partitions: ${stage.numPartitions})"
        case stage : ResultStage =>
          s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
      }
      logDebug(debugString)
      //   调用该方法,提交当前stage的子stage
      submitWaitingChildStages(stage)
    }

从提交最初的Stage中Task Set给TaskScheduler开始,然后逐步的将Stage中的Task Set提交,直到Stage提交完成

剩余的部分由TaskScheduler对Task Set进行调度

至此,DAGScheduler整个流程写完了,接下来阅读TaskScheduler Task提交部分,我们下篇文章见~

亲,看完了点个赞呗

赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

You may also like...

3 Responses

  1. 梦点点说道:

    默默点赞,偷偷来取经!

  2. 匿名说道:

    兄dei,我也在看这本书,也打算自己读一遍写个东西记录下,同道中人赞一下

发表评论

邮箱地址不会被公开。