Spark源码阅读:TaskScheduler调度算法与调度池
系列导读:
想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~
6.Master注册机制-Application的注册与Application的调度算法
8.CoarseGrainedExecutorBackend的启动
10.DAGScheduler Stage划分与Task最佳位置计算
13.Task的启动
14.Task的结果处理
在DAGScheduer对Stage划分,并对Task最佳位置进行了计算后(了解相关内容,请访问:Spark源码阅读:DAGScheduler Stage划分与Task最佳位置计算)
DAGScheduer将Task提交给TaskScheduler,将多个Task打包为TaskSet后,调用taskScheduler的submitTasks方法
在看该方法前,我们先来对TaskScheduler和TaskSchedulerImpl进行一下了解
(关于TaskScheduler的初始化:请查看:Spark源码阅读:TaskScheduler的创建和启动)
TaskScheduler是一个Trait,在Spark中唯一的实现是TaskSchedulerImpl类
TaskSchedulerImpl用于接收DAGSchduler给每个Stage创建好的TaskSet,按照调度算法将资源分配给Task,并且将Task交给Spark集群上Executor运行,当Task失败时进行重试,通过推断执行减轻落后的Task对整体作业进度的影响
下面看一下在TaskSchedulerImpl中一些重要的组件
1.Pool调度池:TaskSchedulerImpl对任务的调度基于调度池
2.TaskSetManager:用于管理TaskSet,包括任务推断,Task本地性,并对Task的资源进行分配
今天就先从Pool调度池开始
与Yarn队列、JVM GC Root相似,调度池中有一个根队列rootPool,根队列中包含了多个子调度池
子调度池中还可以包含其他的调度池或TaskSetManager
(关于TaskSetManager我们下篇文章会有介绍)
所以,整个调度池是个多层级的结构,如下图
Pool的创建需要四个参数:
1.poolName:调度池名称
2.schedulingMode:调度池模式,共有三种类型:FAIR(公平调度模式)、FIFO(先入先出模式)、NONE
3.initMinShare:MinShare初始值
4.initWeight:Weight初始值
在Pool类中,有个一不得不说的方法taskSetSchedulingAlgorithm
调度池中对taskSet的调度,取决于调度算法,根据调度模式进行匹配
如果是FIFO调度模式,则为FIFO先入先出算法,如果是FARI调度模式,则为FAIR公平调度算法
var taskSetSchedulingAlgorithm: SchedulingAlgorithm = { schedulingMode match { case SchedulingMode.FAIR => new FairSchedulingAlgorithm() case SchedulingMode.FIFO => new FIFOSchedulingAlgorithm() case _ => val msg = "Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead." throw new IllegalArgumentException(msg) } }
下面就来看看两种调度算法的实现
两种调度算法位于org.apache.spark.scheduler.SchedulingAlgorithm类
该类中仅定义了一个方法,对两个Schedulable进行比较
def comparator(s1: Schedulable, s2: Schedulable): Boolean
FIFO先入先出调度算法的实现
1.先获取两个Schedulable s1,s2的优先级
2.对两个优先级Schedulable 进行比较
3.如果优先级相同,则对两个Schedulable stageId进行比较
4.如果结果小于0,则优先调度s1,否则优先调度s2
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { val priority1 = s1.priority val priority2 = s2.priority var res = math.signum(priority1 - priority2) if (res == 0) { val stageId1 = s1.stageId val stageId2 = s2.stageId res = math.signum(stageId1 - stageId2) } res < 0 } }
FAIR公平调度算法的实现
1.获取两个Schedulable s1,s2的minShare,runningTasks,Needy,minShareRatio,taskToWeightRatio的信息
2.如果s1的正在运行task数小于minShare,并且s2的正在运行task数大于等于minShare,则说明s1的分配的资源不足,优先调度s1
3.反之,如果s1的正在运行task数大于等于minShare,并且s2的正在运行task数小于minShare,则说明s2的分配的资源不足,优先调度s2
4.如果s1和s2的正在运行task数都小于minShare,那么对minShareRatio进行比较,如果s1的minShareRatio小于s2的minShareRatio,那么优先调度s1,反之优先调度s2
5.如果s1和s2的正在运行task数都大于等于minShare,则对taskToWeightRatio进行比较,如果s1的taskToWeightRatio小于s2的taskToWeightRatio,那么优先调度s1,反之优先调度s2
6.如果minShareRatio或taskToWeightRatio比值相等,则比较s1和s2的name,如果s1小于s2,则优先调度s1,反之优先调度s2
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { val minShare1 = s1.minShare val minShare2 = s2.minShare val runningTasks1 = s1.runningTasks val runningTasks2 = s2.runningTasks val s1Needy = runningTasks1 < minShare1 val s2Needy = runningTasks2 < minShare2 val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0) val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0) val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble var compare = 0 if (s1Needy && !s2Needy) { return true } else if (!s1Needy && s2Needy) { return false } else if (s1Needy && s2Needy) { compare = minShareRatio1.compareTo(minShareRatio2) } else { compare = taskToWeightRatio1.compareTo(taskToWeightRatio2) } if (compare < 0) { true } else if (compare > 0) { false } else { s1.name < s2.name } } }
了解调度算法,下面来看Pool的创建
根调度池的创建,在TaskSchedulerImpl的initialize初始化方法中
1.创建rootPool,传入参数,其中schedulingMode调度模式,是我们可以通过添加spark参数spark.scheduler.mode进行配置,默认为FIFO
2.根据调度匹配,获得对应的schedulableBuilder
正如设计模式中建造者模式一样,schedulableBuilder的作用:创建好池塘后,池内是空的,需要建造者去创建池中的“内容”(例如,向池中注水)
3.调用schedulableBuilder的buildPools方法
rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { // 根据调度模式匹配 schedulingMode match { case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf) case _ => throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode") } } schedulableBuilder.buildPools()
schedulableBuilder是一个Trait,定义了三个方法:
1.rootPool:获取根调度池
2.buildPools:构建调度池
3.addTaskSetManager:向调度池内添加TaskSetManager
private[spark] trait SchedulableBuilder { def rootPool: Pool def buildPools(): Unit def addTaskSetManager(manager: Schedulable, properties: Properties): Unit }
schedulableBuilder中,对FIFO和FAIR两种调度算法有两种实现FIFOSchedulableBuilder和FairSchedulableBuilder
FIFOSchedulableBuilder的实现如下:
1.buildPools方法什么也没有
2.addTaskSetManager方法向rootPool中添加了TaskSetManager
private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging { override def buildPools() { // nothing } override def addTaskSetManager(manager: Schedulable, properties: Properties) { rootPool.addSchedulable(manager) } }
FairSchedulableBuilder的实现如下:
FairSchedulableBuilder读取用户指定的参数spark.scheduler.allocation.file对应的文件,如果没有指定该文件,则默认加载默认路径的配置文件:$SPARK_HOME/CONF/fairscheduler.xml
一个实例fairscheduler.xml内容如下:
<allocations> <pool name="production"> <schedulingMode>FAIR</schedulingMode> <weight>1</weight> <minShare>2</minShare> </pool> <pool name="test"> <schedulingMode>FIFO</schedulingMode> <weight>2</weight> <minShare>3</minShare> </pool> </allocations>
buildPools方法:
用于创建调度池
1.调用buildFairSchedulerPool方法去获取文件输入流,解析XML配置文件,创建调度池
2.最后调用buildDefaultPool方法,创建默认池
override def buildPools() { var is: Option[InputStream] = None try { is = Option { schedulerAllocFile.map { f => new FileInputStream(f) }.getOrElse { Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) } } is.foreach { i => buildFairSchedulerPool(i) } } finally { is.foreach(_.close()) } buildDefaultPool() }
buildFairSchedulerPool方法:
用于创建FAIR调度算法调度池
1.将文件输入流转换为XML
2.将XML中对应的配置项,转换成调度池中对应的属性
例如获取XML中pool name=”production”,将production作为调度池名称,以此类推,获取schedulingMode,minShare ,weight配置,根据获取到的配置,创建调度池,将刚创建的调度池添加到根调度池
private def buildFairSchedulerPool(is: InputStream) { val xml = XML.load(is) for (poolNode <- (xml \\ POOLS_PROPERTY)) { val poolName = (poolNode \ POOL_NAME_PROPERTY).text var schedulingMode = DEFAULT_SCHEDULING_MODE var minShare = DEFAULT_MINIMUM_SHARE var weight = DEFAULT_WEIGHT val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text if (xmlSchedulingMode != "") { try { schedulingMode = SchedulingMode.withName(xmlSchedulingMode) } catch { case e: NoSuchElementException => logWarning(s"Unsupported schedulingMode: $xmlSchedulingMode, " + s"using the default schedulingMode: $schedulingMode") } } val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text if (xmlMinShare != "") { minShare = xmlMinShare.toInt } val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text if (xmlWeight != "") { weight = xmlWeight.toInt } val pool = new Pool(poolName, schedulingMode, minShare, weight) rootPool.addSchedulable(pool) logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( poolName, schedulingMode, minShare, weight)) } }
buildDefaultPool方法:
用于创建默认调度池
1.判断根调度池中是否存在default Pool默认调度池
2.如果不存在,就创建
3.将默认调度池添加到根调度池
在FairSchedulableBuilder中,预先设置好了默认的配置常量,默认为default调度池,默认调度模式为FIFO,默认MINIMUM_SHARE为0,默认WEIGHT为1
val DEFAULT_POOL_NAME = "default" val DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO val DEFAULT_MINIMUM_SHARE = 0 val DEFAULT_WEIGHT = 1
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) rootPool.addSchedulable(pool)
addTaskSetManager方法:
用于向调度池中添加TaskSetManager
1.TaskSetManager的默认父调度池为默认调度池default Pool
2.如果TaskSet中properties参数不为空,获取properties中FAIR_SCHEDULER_PROPERTIES,DEFAULT_POOL_NAME配置,得到父调度池信息,将TaskSetManager父调度池修改为从properties获取到的调度池
3.如果properties中,没有获取到父调度池,则创建默认父调度池default Pool
4.将TaskSetManager添加到父调度池中
override def addTaskSetManager(manager: Schedulable, properties: Properties) { var poolName = DEFAULT_POOL_NAME var parentPool = rootPool.getSchedulableByName(poolName) if (properties != null) { poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME) parentPool = rootPool.getSchedulableByName(poolName) if (parentPool == null) { parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) rootPool.addSchedulable(parentPool) logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) } } parentPool.addSchedulable(manager) logInfo("Added task set " + manager.name + " tasks to pool " + poolName) }
通过上面的阅读, 我们已经了解了TaskSchedulerImpl调度算法与调度池的创建流程
下篇文章继续阅读TaskSetManager,并对Task的提交进行部分进行收尾,我们下篇文章见~
亲,看完了点个赞呗!
博主辛苦了,给点个赞。