Spark源码阅读:DAGScheduler源码分析


系列导读

想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~

1.Spark Submit任务提交

2.Driver的注册与启动

3.SparkContext的初始化

4.TaskScheduler的创建和启动

5.Application的注册

6.Master注册机制-Application的注册与Application的调度算法

7.Spark源码阅读:Executor的启动

8.CoarseGrainedExecutorBackend的启动

9.DAGScheduler源码分析

10.DAGScheduler Stage划分与Task最佳位置计算

11.TaskScheduler调度算法与调度池

12.TaskScheduler任务提交与资源分配

13.Task的启动

14.Task的结果处理


Executor启动完了,接下来就准备在executor上执行task了,关于task任务的执行,就是我们接下来要说的TaskScheduler和DAGScheduler了

它们的作用如下:

TaskScheduler作用是为创建它的SparkContext调度任务,即从DAGScheduler接受不同Stage的任务,并且向集群提交这些任务

DAGScheduler主要负责分析用户提交的应用,并根据计算任务的依赖关系建立DAG,然后将DAG划分为不同的Stage,其中每个Stage由可以执行的一组Task构成,这些Task的执行逻辑完全相同,只是作用于不同的数据

而且DAGScheduler在不同的资源管理框架下的实现是相同的,DAGScheduler将这组Task划分完成后,会将这组Task提交到TaskScheduler

TaskScheduler通过Cluster Manager在集群的某个Worker的Executor上启动任务


看完了概念,那么在具体的Spark任务中,DAGScheduler是如何触发的呢?

任务的触发,就要结合我们的SparkPi程序

这是SparkPi程序运行时,在Spark Web UI上查看spark job的运行情况,可以直观的看到Active Job,以及Job事件时间线


在SparkPi程序中,使用RDD的action算子,会触发Spark任务,即在SparkPi程序中的reduce算子,触发了任务的执行

object SparkPi {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi")
    val spark = new SparkContext(conf)
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
  }
}

看一下reduce算子,查看触发DAGScheduler的流程

在reduce中发现,其实调用了sparkContext的runJob方法

def reduce(f: (T, T) => T): T = withScope {
    val cleanF = sc.clean(f)
    val reducePartition: Iterator[T] => Option[T] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(cleanF))
      } else {
        None
      }
    }
    var jobResult: Option[T] = None
    val mergeResult = (index: Int, taskResult: Option[T]) => {
      if (taskResult.isDefined) {
        jobResult = jobResult match {
          case Some(value) => Some(f(value, taskResult.get))
          case None => taskResult
        }
      }
    }
    sc.runJob(this, reducePartition, mergeResult)
    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
  }

SparkContext的runJob方法,其中调用了多层runJob方法

  def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
    runJob(rdd, func, 0 until rdd.partitions.length)
  }

第一层runJob

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: Iterator[T] => U,
      partitions: Seq[Int]): Array[U] = {
    val cleanedFunc = clean(func)
    runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
  }

第二层runJob

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int]): Array[U] = {
    val results = new Array[U](partitions.size)
    runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
    results
  }

第三层runJob

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

实际上是dagScheduler调用了runJob方法再跟着这个runJob走

进入DAGScheduler.scala

runJob方法中调用了submitJob方法,用于提交job,该方法返回一个JobWaiter,用于等待DAGScheduler任务完成

/**
    * 在给定的RDD上执行一个action job并将所有结果传递给resultHandler函数
   */
  def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) 

submitJob方法,调用eventProcessLoop post将JobSubmitted事件添加到DAGScheduler事件队列中

这个eventProcessLoop是DAGSchedulerEventProcessLoop,DAGSchedulerEventProcessLoop是来对DAGScheduler主要事件进行管理

 eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))

 private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
  taskScheduler.setDAGScheduler(this)

提交的JobSubmitted事件,实际上调用了DAGScheduler的handleJobSubmitted方法

case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

到这里,我们找到了DAGScheduler调度的核心入口handleJobSubmitted

一共做了四个操作:

1.使用触发job的最后一个rdd,创建finalStage

这说一下stage,stage有两种,一种是ShuffleMapStage,另一种是ResultStage

ShuffleMapStage用来处理Shuffle

ResultStage是最后执行的Stage,用于任务的收尾工作

ResultStage之前的stage都是ShuffleMapStage

2.用finalStage创建一个Job,这个job的最后一个stage,就是finalStage

3.将Job相关信息,加入内存缓冲中

4.第四步,使用submitStage方法提交finalStage

  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {

    var finalStage: ResultStage = null
    try {

      // 第一步:
      //使用触发job的最后一个RDD,创建finalStage
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }

      //第二步:用finalStage创建一个Job
      //这个job的最后一个stage,就是finalStage
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

      //第三步,将Job相关信息,加入内存缓冲中
    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
      //第四步,使用submitStage方法提交finalStage
      //这个方法的调用,其实会提交第一个stage,并将其他的stage放到waitingStages队列中
    submitStage(finalStage)

  }

submitStage方法用于提交stage

1.对stage的active job id进行验证,如果存在,进行第2步,如果不存在,则abortStage终止

2.在提交这个stage之前,先要判断当前stage不是等待中/运行中/失败的stage,如果都不是,进行第3步,如果是,则abortStage终止

3.调用getMissingParentStages方法,获取当前stage的所有未提交的父stage

    如果不存在stage未提交的父stage,调用submitMissingTasks方法,提交当前stage所有未提交的task

    如果存在未提交的父Stage,递归调用submitStage提交所有未提交的父stage(即如果这个stage一直有未提交的父stage,一直进行调用,直到最开始的stage 0),并将当前stage加入waitingStages等待执行队列中


private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        //调用getMissingParentStages获取当前这个stage的父stage
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        /*
        这里会反复递归调用,直到最开始的stage,它没有父stage,此时就会提交这个最开始的stage,stage0
        其余的stage都在waitingStages中
         */
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
        } else {
          //如果有父stage
          //递归调用submitStage()方法,去提交父stage
          //这里的递归就是stage划分算法的推动和亮点!
          for (parent <- missing) {
            submitStage(parent)
          }
          //并且将当前的stage添加到waitingStages等待执行的队列中
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

其中,调用的getMissingParentStages方法,就涉及到了stage的划分算法

在划分算法中做了如下操作:

1.创建了一个存放rdd的栈

2.往栈中推入了stage的最后一个rdd

3.如果栈不为空,对这个rdd调用内部定义的visit方法

在visit方法中做了如下操作:

1.遍历传入rdd的依赖

2.如果是宽依赖,调用getOrCreateShuffleMapStage方法,创建一个新的stage

   如果是窄依赖,将rdd放入栈中

3.将新的stage list返回

/*
  获取某个stage的父stage
  这个方法,对于一个stage,如果它的最后一个rdd的所有依赖,都是窄依赖,那么就不会创建任何新的stage
  但只要发现这个stage的rdd宽依赖了某个rdd,那么就用宽依赖的那个rdd,创建一个新的stage
  然后立即将新的stage返回
   */
  private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new Stack[RDD[_]]
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {
        visited += rdd
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        //遍历RDD的依赖
        if (rddHasUncachedPartitions) {
          for (dep <- rdd.dependencies) {
            dep match {
                //如果是宽依赖
              case shufDep: ShuffleDependency[_, _, _] =>
                // 那么使用宽依赖的那个RDD,使用getOrCreateShuffleMapStage()方法去创建一个stage
                // 默认最后一个stage,不是shuffleMap stage
                // 但是finalStage之前所有的stage,都是shuffleMap Stage
                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
                //如果是窄依赖,将rdd放入栈中
              case narrowDep: NarrowDependency[_] =>
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }
      }
    }
    //首先往栈中,推入了stage最后的一个rdd  finalStage
    waitingForVisit.push(stage.rdd)
    //进行while循环
    while (waitingForVisit.nonEmpty) {
      // 对stage的最后一个rdd,调用自己内部定义的visit()方法
      visit(waitingForVisit.pop())
    }
    missing.toList
  }

顺便看一下getOrCreateShuffleMapStage方法,用于创建stage

方法中调用createShuffleMapStage方法,为给定的宽依赖创建ShuffleMapStage

  private def getOrCreateShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
      ...
        // 为给定的shuffle依赖创建一个stage
        createShuffleMapStage(shuffleDep, firstJobId)
    }
  }

createShuffleMapStage会创建一个新的ShuffleMapStage

 val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)

stage的创建和划分就完成了。

在Spark Web UI上看到当前的stage如下:

查看Active Stage为1


点击Description,查看stage描述

查看Spark Pi程序生成的DAG图如下:

因为程序中不含有shuffle操作,即没有宽依赖,所以只有一个stage,为finalStage。



总结一下:

DAGScheduler的stage的划分算法很重要,想要很好的掌握spark,要对stage划分算法很清晰,知道自己编写的spark application被划分了几个job,每个job被划分成了几个stage,每个stage包含了哪些代码,定位到代码。

当我们遇到异常时,发现stage运行过慢,或者stage报错,我们能针对那个stage对应的代码去排查问题,或者性能调优。

好啦,我们下篇文章见~

赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

You may also like...

发表评论

邮箱地址不会被公开。