Spark源码阅读:TaskScheduler任务提交与资源分配
系列导读:
想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~
6.Master注册机制-Application的注册与Application的调度算法
8.CoarseGrainedExecutorBackend的启动
10.DAGScheduler Stage划分与Task最佳位置计算
13.Task的启动
14.Task的结果处理
上一篇文件介绍了TaskSchedulerImpl的调度算法和调度池
TaskSchedulerImpl按照调度算法将资源分配给Task,TaskSchedulerImpl对任务的调度基于调度池
了解相关内容,请戳:Spark源码阅读:TaskScheduler调度算法与调度池
这篇文章继续对TaskSchedulerImpl的中Task提交进行阅读
提交Task
在DAGScheduer对Stage划分,并对Task最佳位置进行了计算后(了解相关内容,请访问:Spark源码阅读:DAGScheduler Stage划分与Task最佳位置计算)
在DAGScheduer将Task提交给TaskScheduler之前,将多个Task打包为TaskSet后(TaskSet是整个调度中对Task进行调度管理的基本单位),调用taskScheduler的submitTasks方法
实则是调用了TaskSchedulerImpl的submitTasks方法
查看submitTasks方法,做了如下事情:
1.获取TaskSet中的task数组
2.(重点)为每个taskSet创建了一个TaskSetManager(任务集合管理器),用于管理TaskSet,包括任务推断,Task本地性,并对Task的资源进行分配
3.向调度池中添加刚才创建TaskSetManager
4.判断应用程序是否为local模式并且没有接收到Task
如果是创建一个定时器,通过指定时间检查TaskSchedulerImpl的饥饿状况
如果接收到Task,则取消这个定时器
5.同步块中最后判断TaskSchedulerImpl是否接收到Task
如果接收到Task,则调用backend.reviveOffers()方法,为Task分配资源
override def submitTasks(taskSet: TaskSet) { // 获取taskSet中task val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { // 为每个taskSet创建了一个TaskSetManager val manager = createTaskSetManager(taskSet, maxTaskFailures) // 获取stage id val stage = taskSet.stageId // 由于TaskSetManager不是线程安全的,所有对它的访问都应该同步,所以添加了synchronized修饰 // 并创建一个HashMap,用于存储stage对应的TaskSetManager val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) // 将TaskSetManager加入刚才创建的HashMap中,进行缓存 stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie } if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } // 向调度池中添加刚才创建的TaskSetManager schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) // 判断应用程序是否为local模式并且TaskSchedulerImpl没有接收到Task if (!isLocal && !hasReceivedTask) { // 创建一个定时器,通过指定时间检查TaskSchedulerImpl的饥饿状况 starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { // 如果TaskSchedulerImpl接收到Task,则取消这个定时器 if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { this.cancel() } } }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } // 判断是否接受到Task hasReceivedTask = true } // 给Task分配资源 backend.reviveOffers() }
资源分配
接下来,调用backend的reviveOffers方法为Task分配资源
我们的提交模式为Standalone,所以当前的backend为StandaloneSchedulerBackend
StandaloneSchedulerBackend的reviveOffers方法是继承其父类的CoarseGrainedSchedulerBackend的reviveOffers方法
来看CoarseGrainedSchedulerBackend的reviveOffers方法:向driver Endpoint发送消息ReviveOffers,driver Endpoint实则就是CoarseGrainedSchedulerBackend在启动时创建的driver Endpoint
override def reviveOffers() { // 向driver Endpoint发送消息ReviveOffers driverEndpoint.send(ReviveOffers) }
在CoarseGrainedSchedulerBackend driver Endpoint接受到发送来的ReviveOffers消息后,进行模式匹配,调用makeOffers方法
case ReviveOffers => makeOffers()
makeOffers方法向所有可用executor分配虚假资源
该方法内容如下:
1.筛选掉死去的executors
2.将这个application所有可用的executor,将其封装为workOffers,每个workOffers代表了每个executor可用的cpu资源数量
3.调用scheduler的resourceOffers方法,将workOffers传入,实际调用的是TaskSchedulerImpl的resourceOffers方法
4.获取scheduler.resourceOffers的返回值,调用launchTasks方法,启动task
// 向所有的executor提供虚假资源 private def makeOffers() { // 筛选掉死掉的executor val activeExecutors = executorDataMap.filterKeys(executorIsAlive) // 将这个application所有可用的executor,将其封装成了workOffers,每个workOffers代表了每个executor可用的cpu资源数量 val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toIndexedSeq // 调用scheduler.resourceOffers的方法,将workOffers传入 // 获取scheduler.resourceOffers的返回值,调用launchTasks方法 launchTasks(scheduler.resourceOffers(workOffers)) }
在启动task之前还有重要的一步,就是TaskSchedulerImpl的resourceOffers方法
了解这个方法前,先来说一下Task的本地性
Task本地性
Spark现在支持五种本地化级别,级别从高到低顺序为:PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY
PROCESS_LOCAL进程本地化:task要计算的数据在同一个Executor中,即同一个JVM中
NODE_LOCAL 节点本地化:速度比 PROCESS_LOCAL 稍慢,因为数据需要在不同进程之间传递或从文件中读取
NO_PREF:没有偏好
RACK_LOCAL 机架本地化:数据在同一机架的不同节点上。需要通过网络传输数据及文件 IO,比NODE_LOCAL慢
ANY:跨机架,数据在非同一机架的网络上,速度最慢
Task的本地性是如何进行运算的呢?
在TaskSetManger中有一个变量myLocalityLevels,即TaskSet的本地化级别,通过调用TaskSetManger的computeValidLocalityLevels方法得到
感兴趣的小伙伴,请自行阅读
来看resourceOffers方法,该方法负责进行资源分配,步骤如下:
1.遍历WorkerOffer
(1)更新host与executor的映射关系
(2)标记添加了新的executor
(3)更新host与机架之间的关系
2.可用的executor进行shuffle分散,避免将task放在同一个worker上,进行负载均衡
3.根据每个WorkerOffer的可用的cpu核数创建同等尺寸的TaskDescription数组
4.将每个WorkerOffer的可用的cpu核数统计到availableCpus数组中
5.按照调度算法排序,从调度池中获取排序的taskSet队列
6.重点:taskSet任务分配算法
算法中,执行步骤如下:
(1) 遍历taskSet,从最快的本地化级别开始,调用resourceOfferSingleTaskSet方法,给每个Task Set中Task进行分配资源
看一下resourceOfferSingleTaskSet方法:
遍历WorkerOffer,如果当前executor的cpu数大于每个task所使用的cpu数量,则选择在该executor上启动task,调用taskSet的resourceOffer方法,在executor上,使用这次本地化级别,查看那些task可用启动
即对当前taskSet,尝试使用最快的本地化级别PROCESS_LOCAL,给task分配资源,让task在executor上启动,如果启动异常,跳出循环,换下一种本地化级别尝试,直到taskSet在某个本地化级别下,task在executor上启动
private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { // 是否启动task var launchedTask = false //遍历所有WorkerOffer for (i <- 0 until shuffledOffers.size) { // 获取executorId val execId = shuffledOffers(i).executorId // 获取host val host = shuffledOffers(i).host // 如果当前executor可用的CPU大于每个task使用的cpu数量(默认为1) if (availableCpus(i) >= CPUS_PER_TASK) { try { // 调用taskSet的resourceOffer方法,在executor上,使用这次本地化级别,查看那些task可用启动 // 遍历TaskSet中Task,使用当前本地化级别启动task,在executor上启动task for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { // 放入二维数组,给指定的executor添加符合条件的task tasks(i) += task // 将相应的信息cache val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorIdToRunningTaskIds(execId).add(tid) availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) // 标识为true launchedTask = true } } catch { case e: TaskNotSerializableException => logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") return launchedTask } } } return launchedTask }
(2)如果在所有TaskSet所允许的本地级别下,TaskSet中没有任何一个Task成功启动,调用taskSet的abortIfCompletelyBlacklisted方法,将其添加到黑名单,放弃该task
(3)返回已经获得资源的task列表
taskSet任务分配算法代码如下:
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added var newExecAvail = false // 遍历WorkerOffer for (o <- offers) { // 判断HashSet中是否存在executor的Host if (!hostToExecutors.contains(o.host)) { // 如果不存在,则添加 hostToExecutors(o.host) = new HashSet[String]() } // 判断正在运行的Task对应的executor id HashMap集合中是否存在executor id if (!executorIdToRunningTaskIds.contains(o.executorId)) { // 如果不存在则添加executor等信息 hostToExecutors(o.host) += o.executorId executorAdded(o.executorId, o.host) executorIdToHost(o.executorId) = o.host executorIdToRunningTaskIds(o.executorId) = HashSet[Long]() newExecAvail = true } for (rack <- getRackForHost(o.host)) { hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host } } // Randomly shuffle offers to avoid always placing tasks on the same set of workers. // 首先将可用的executor进行shuffle分散,避免将task放在同一个worker上,进行负载均衡 val shuffledOffers = Random.shuffle(offers) // Build a list of tasks to assign to each worker. // 根据每个WorkerOffer的可用的cpu核数创建同等尺寸的TaskDescription数组 val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) // 将每个WorkerOffer的可用的cpu核数统计到availableCpus数组中 val availableCpus = shuffledOffers.map(o => o.cores).toArray // 创建完TaskSchedulerImpl和SchedulerBackend后,对TaskSchedulerImpl调用方法initialize进行初始化 // 创建Pool调度池,Pool中缓存了调度队列、调度算法及TaskSetManager集合等信息 // 这里的rootPool就是之前创建好的调度池,在执行task分配时,要从调度池中获取排序的taskSet队列 val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) if (newExecAvail) { taskSet.executorAdded() } } // taskSet任务分配算法,双重for循环,遍历所有的taskSet,以及myLocalityLevels本地化级别 // 遍历taskSet,从最快的本地化级别开始,调用resourceOfferSingleTaskSet方法 for (taskSet <- sortedTaskSets) { var launchedAnyTask = false var launchedTaskAtCurrentMaxLocality = false for (currentMaxLocality <- taskSet.myLocalityLevels) { do { // 对当前taskSet,尝试优先使用最快的本地化级别,为task分配资源,在executor上进行启动task // 如果启动异常,跳出循环,换下一种本地化级别 // 尝试直到taskSet在某个本地化级别下,在executor上成功启动task launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } if (!launchedAnyTask) { taskSet.abortIfCompletelyBlacklisted(hostToExecutors) } } if (tasks.size > 0) { hasLaunchedTask = true } // 返回已经获得资源的task列表 return tasks }
至此,整个Task的提交、分配过程完成。
VV花了一张图,总结了整个Task提交流程,如下图:
下一篇文章,会对最后的lacunchTask启动Task部分进行阅读,我们下篇文章见~
亲,看完了,点个赞呗