Spark源码阅读:DAGScheduler Stage划分与Task最佳位置计算
系列导读:
想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~
6.Master注册机制-Application的注册与Application的调度算法
8.CoarseGrainedExecutorBackend的启动
10.DAGScheduler Stage划分与Task最佳位置计算
13.Task的启动
14.Task的结果处理
前一篇文章阅读了关于DAG相关的源码,具体请查看文章:Spark源码阅读:DAGScheduler源码分析
本篇继续对DAGScheduler中Stage的划分部分源码,结合例子进行,以及对整个DAGScheduler进行总结
开始看源码之前,这里有张宽依赖和窄依赖的图
窄依赖:指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区,和两个父RDD的分区对应于一个子RDD 的分区。图中,map/filter和union属于第一类,对输入进行协同划分(co-partitioned)的join属于第二类。
宽依赖:指子RDD的分区依赖于父RDD的所有分区,这是因为shuffle操作,如图中的groupByKey和未经协同划分的join。
划分Stage:以shuffle和result这两种类型来划分。
在Spark中有两类task,一类是shuffleMapTask,一类是resultTask
第一类task的输出是shuffle所需数据,第二类task的输出是result,stage的划分也以此为依据。shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage。比如 rdd.parallize(1 to 10).foreach(println) 这个操作没有shuffle,直接就输出了,那么只有它的task是resultTask,stage也只有一个;如果是rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println), 这个job因为有reduceByKey,是一个shuffle过程,那么reduceByKey之前的是一个stage,执行shuffleMapTask,输出shuffle所需的数据,reduceByKey到最后是一个stage,直接就输出结果了。
同理,如果job中有多次shuffle,那么每个shuffle之前都是一个stage。
Stage划分算法有如下几个图步骤:
具体Stage划分算法,这里不再概述,请查看文章:Spark源码阅读:DAGScheduler源码分析
以下图为例,我们过一遍整个Stage的划分
触发Job的finalRDD,即图中的final RDD为RDD G
首先通过getMissingParentStages方法判断RDD G是否含有父Stage
getMissingParentStages方法中,先创建了一个存放RDD的栈
接着往栈中,推入了RDD G
val waitingForVisit = new Stack[RDD[_]]
//首先往栈中,推入了stage最后的一个rdd finalRDD waitingForVisit.push(stage.rdd)
进行while循环,由于上面推入了RDD G,栈不为空,那么调用内部visit方法
/进行while循环 while (waitingForVisit.nonEmpty) { // 对stage的最后一个rdd,调用自己内部定义的visit()方法 visit(waitingForVisit.pop()) }
判断rdd是否被访问过,若符合条件,先将该rdd添加到已访问,继续往下执行
if (!visited(rdd)) { visited += rdd val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
接下来判断RDD G的依赖是宽依赖还是窄依赖
图中可见,RDD B与RDD G是窄依赖,将RDD B放入栈中,RDD F与RDD G是宽依赖
于是调用getOrCreateShuffleMapStage方法创建一个了stage,RDD G与RDD F被划分成了两个Stage,将RDD G所在Stage划分成了Stage 3,将RDD F所在的Stage划分成了Stage 2,返回一个list[Stage]
继续进行判断,从RDD B开始,RDD B往前找它的父RDD,父RDD A与RDD B是宽依赖的关系,所以,RDD A与RDD B中间也创建了一个Stage,RDD A与RDD B被划分成了两个Stage,RDD A被划分成Stage 1
RDD F与它的父RDD D/E都是窄依赖关系,不产生新的Stage,RDD D与RDD C也是窄依赖关系,也不产生新的Stage,所以RDD C、D、E、F都在Stage 2中。
到这里,整个的Stage划分完成了,返回划分好的stage list
if (rddHasUncachedPartitions) { for (dep <- rdd.dependencies) { dep match { //如果是宽依赖 case shufDep: ShuffleDependency[_, _, _] => // 那么使用宽依赖的那个RDD,使用getOrCreateShuffleMapStage()方法去创建一个stage // 默认最后一个stage,不是shuffleMap stage // 但是finalStage之前所有的stage,都是shuffleMap Stage val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId) if (!mapStage.isAvailable) { missing += mapStage } //如果是窄依赖,将rdd放入栈中 case narrowDep: NarrowDependency[_] => waitingForVisit.push(narrowDep.rdd) } } }
我们来整理一下整个Stage的划分思路:
从后往前推,遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个RDD加入该stage中。
通过上面的Stage划分,我们不断迭代,提交Stage,获取最开始的Stage,即Stage 1
接下来要对当前传入的Stage 1进行判断,判断该Stage有没有父Stage或父Stage是否执行完毕
Stage 1没有父Stage,调用submitMissingTasks方法,用于提交Stage 1中的Task
submitMissingTasks方法中做了如下事情:
1.首先获取stage中没有计算的partition
2.获取task对应的partition的最佳位置(Task最佳位置计算)
3.获取taskBinary,将Stage的RDD和ShuffleDependency(或func)广播到Executor
4.为stage创建task
为了保证计算数据的本地性(数据在哪个节点上,就到哪个节点上的Executor上去运行),DAGScheduler中提供了寻找task对应的partition最佳位置的方法
我们看一下task的最佳位置计算是怎么实现的,事实上调用了getPreferredLocs的getPreferredLocsInternal方法
1.判断rdd的partition是否被访问过,如果被访问过,则什么都不做
2.然后判断DAGScheduler的内存中是否cache了在当前Paritition的信息,如果有的话直接返回
3.如果没有cache,则调用rdd.getPreferedLocations方法,获取RDD partition的最佳位置
final def preferredLocations(split: Partition): Seq[String] = { checkpointRDD.map(_.getPreferredLocations(split)).getOrElse { getPreferredLocations(split) } }
4.遍历RDD的依赖,如果有窄依赖,遍历父依赖的partition,对遍历到的每个partition,递归调用getPreferredLocsInternal方法
即,从第一个窄依赖的第一个partition开始,然后将每个partition的最佳位置,添加到序列中,最后返回所有partition的最佳位置序列
如果该stage中rdd,从最后一个rdd,到最开始的rdd的partition都没有被cache或者checkpoint,那么,task的最佳位置为Nil,即没有最佳位置
private def getPreferredLocsInternal( rdd: RDD[_], partition: Int, visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = { if (!visited.add((rdd, partition))) { // Nil has already been returned for previously visited partitions. return Nil } // 查看当前rdd的partition是否被DAGSchduler缓存了 val cached = getCacheLocs(rdd)(partition) if (cached.nonEmpty) { return cached } // 调用rdd的preferredLocations方法,获取rdd partition的最佳位置 val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList if (rddPrefs.nonEmpty) { return rddPrefs.map(TaskLocation(_)) } // 遍历RDD的依赖,如果有窄依赖,获取其父依赖的partition // 递归调用getPreferredLocsInternal方法,从第一个窄依赖的第一个partition开始找到task最佳位置,然后返回task最佳位置序列 rdd.dependencies.foreach { case n: NarrowDependency[_] => for (inPart <- n.getParents(partition)) { val locs = getPreferredLocsInternal(n.rdd, inPart, visited) if (locs != Nil) { return locs } } //如果这个stage,从最后一个rdd,到最开始的rdd,partition都没有被缓存或者checkpoint // 那么,task的最佳位置为Nil,就是没有 case _ => } Nil }
DAGScheduler计算数据本地性的时候巧妙的借助了RDD自身的getPreferedLocations中的数据,最大化的优化的效率,因为getPreferedLocations中表明了每个Partition的数据本地性,虽然当前Partition可能被persist或者checkpoint,但是persist或者checkpoint默认情况下肯定是和getPreferedLocations中的Partition的数据本地性是一致的,所以这就极大的简化Task数据本地性算法的实现和效率的优化
获取到task的最佳位置后,根据stage的类型进行匹配,为每个partition创建一个task
将之前准备好的task最佳位置,taskBinary等其他信息作为参数传入,创建Task
如果是ShuffleMapStage,创建ShuffleMapTask,添加到tasks中
case stage: ShuffleMapStage => partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) // 创建ShuffleMapTask new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) }
如果是ResultStage,那么创建ResultTask,添加到tasks中
case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) // 创建ResultTask new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) }
如果tasks序列中添加了创建好的task,即tasks.size大于0,则为stage中的task,创建TaskSet对象,调用taskScheduler的submitTasks方法,提交TaskSet给TaskScheduler
如果tasks.size小于等于0,将当前stage标记为完成,然后调用submitWaitingChildStages方法,提交当前stage的子stage
if (tasks.size > 0) { logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") stage.pendingPartitions ++= tasks.map(_.partitionId) logDebug("New pending partitions: " + stage.pendingPartitions) // 最后,针对stage的task,创建TaskSet对象,调用taskScheduler的submitTasks方法,提交TaskSet taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else { // 将stage标记为已完成 markStageAsFinished(stage, None) val debugString = stage match { case stage: ShuffleMapStage => s"Stage ${stage} is actually done; " + s"(available: ${stage.isAvailable}," + s"available outputs: ${stage.numAvailableOutputs}," + s"partitions: ${stage.numPartitions})" case stage : ResultStage => s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})" } logDebug(debugString) // 调用该方法,提交当前stage的子stage submitWaitingChildStages(stage) }
从提交最初的Stage中Task Set给TaskScheduler开始,然后逐步的将Stage中的Task Set提交,直到Stage提交完成
剩余的部分由TaskScheduler对Task Set进行调度
至此,DAGScheduler整个流程写完了,接下来阅读TaskScheduler Task提交部分,我们下篇文章见~
亲,看完了点个赞呗
默默点赞,偷偷来取经!
好的~
兄dei,我也在看这本书,也打算自己读一遍写个东西记录下,同道中人赞一下