Spark源码阅读:TaskScheduler调度算法与调度池


系列导读

想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~

1.Spark Submit任务提交

2.Driver的注册与启动

3.SparkContext的初始化

4.TaskScheduler的创建和启动

5.Application的注册

6.Master注册机制-Application的注册与Application的调度算法

7.Spark源码阅读:Executor的启动

8.CoarseGrainedExecutorBackend的启动

9.DAGScheduler源码分析

10.DAGScheduler Stage划分与Task最佳位置计算

11.TaskScheduler调度算法与调度池

12.TaskScheduler任务提交与资源分配

13.Task的启动

14.Task的结果处理


在DAGScheduer对Stage划分,并对Task最佳位置进行了计算后(了解相关内容,请访问:Spark源码阅读:DAGScheduler Stage划分与Task最佳位置计算

DAGScheduer将Task提交给TaskScheduler,将多个Task打包为TaskSet后调用taskSchedulersubmitTasks方法

在看该方法前,我们先来对TaskSchedulerTaskSchedulerImpl进行一下了解

(关于TaskScheduler的初始化:请查看:Spark源码阅读:TaskScheduler的创建和启动

TaskScheduler是一个Trait,在Spark中唯一的实现是TaskSchedulerImpl类

TaskSchedulerImpl用于接收DAGSchduler给每个Stage创建好的TaskSet,按照调度算法将资源分配给Task,并且将Task交给Spark集群上Executor运行,当Task失败时进行重试,通过推断执行减轻落后的Task对整体作业进度的影响

下面看一下在TaskSchedulerImpl中一些重要的组件

1.Pool调度池:TaskSchedulerImpl对任务的调度基于调度池

2.TaskSetManager:用于管理TaskSet,包括任务推断,Task本地性,并对Task的资源进行分配


今天就先从Pool调度池开始

Yarn队列、JVM GC Root相似,调度池中有一个根队列rootPool,根队列中包含了多个子调度池

子调度池中还可以包含其他的调度池或TaskSetManager

(关于TaskSetManager我们下篇文章会有介绍)

所以,整个调度池是个多层级的结构,如下图


Pool的创建需要四个参数:

1.poolName:调度池名称

2.schedulingMode:调度池模式,共有三种类型:FAIR(公平调度模式)、FIFO(先入先出模式)、NONE

3.initMinShare:MinShare初始值

4.initWeight:Weight初始值


在Pool类中,有个一不得不说的方法taskSetSchedulingAlgorithm

调度池中对taskSet的调度,取决于调度算法,根据调度模式进行匹配

如果是FIFO调度模式,则为FIFO先入先出算法,如果是FARI调度模式,则为FAIR公平调度算法

var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
    schedulingMode match {
      case SchedulingMode.FAIR =>
        new FairSchedulingAlgorithm()
      case SchedulingMode.FIFO =>
        new FIFOSchedulingAlgorithm()
      case _ =>
        val msg = "Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
        throw new IllegalArgumentException(msg)
    }
  }

下面就来看看两种调度算法的实现

两种调度算法位于org.apache.spark.scheduler.SchedulingAlgorithm类

该类中仅定义了一个方法,对两个Schedulable进行比较

def comparator(s1: Schedulable, s2: Schedulable): Boolean

FIFO先入先出调度算法的实现

1.先获取两个Schedulable s1,s2的优先级

2.对两个优先级Schedulable 进行比较

3.如果优先级相同,则对两个Schedulable  stageId进行比较

4.如果结果小于0,则优先调度s1,否则优先调度s2

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    res < 0
  }
}

FAIR公平调度算法的实现

1.获取两个Schedulable s1,s2的minShare,runningTasks,Needy,minShareRatio,taskToWeightRatio的信息

2.如果s1的正在运行task数小于minShare,并且s2的正在运行task数大于等于minShare,则说明s1的分配的资源不足,优先调度s1

3.反之,如果s1的正在运行task数大于等于minShare,并且s2的正在运行task数小于minShare,则说明s2的分配的资源不足,优先调度s2

4.如果s1和s2的正在运行task数小于minShare,那么对minShareRatio进行比较,如果s1的minShareRatio小于s2的minShareRatio,那么优先调度s1,反之优先调度s2

5.如果s1和s2的正在运行task数大于等于minShare,则对taskToWeightRatio进行比较,如果s1的taskToWeightRatio小于s2的taskToWeightRatio,那么优先调度s1,反之优先调度s2

6.如果minShareRatio或taskToWeightRatio比值相等,则比较s1和s2的name,如果s1小于s2,则优先调度s1,反之优先调度s2

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble

    var compare = 0
    if (s1Needy && !s2Needy) {
      return true
    } else if (!s1Needy && s2Needy) {
      return false
    } else if (s1Needy && s2Needy) {
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    }
    if (compare < 0) {
      true
    } else if (compare > 0) {
      false
    } else {
      s1.name < s2.name
    }
  }
}


了解调度算法,下面来看Pool的创建

根调度池的创建,在TaskSchedulerImpl的initialize初始化方法中

1.创建rootPool,传入参数,其中schedulingMode调度模式,是我们可以通过添加spark参数spark.scheduler.mode进行配置,默认为FIFO

2.根据调度匹配,获得对应的schedulableBuilder

正如设计模式中建造者模式一样,schedulableBuilder的作用:创建好池塘后,池内是空的,需要建造者去创建池中的“内容”(例如,向池中注水)

3.调用schedulableBuilder的buildPools方法

 rootPool = new Pool("", schedulingMode, 0, 0)
    schedulableBuilder = {
      //  根据调度模式匹配
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
      }
    }
    schedulableBuilder.buildPools()

schedulableBuilder是一个Trait,定义了三个方法:

1.rootPool:获取根调度池

2.buildPools:构建调度池

3.addTaskSetManager:向调度池内添加TaskSetManager

private[spark] trait SchedulableBuilder {
  def rootPool: Pool
  def buildPools(): Unit
  def addTaskSetManager(manager: Schedulable, properties: Properties): Unit
}

schedulableBuilder中,对FIFO和FAIR两种调度算法有两种实现FIFOSchedulableBuilderFairSchedulableBuilder

FIFOSchedulableBuilder的实现如下:

1.buildPools方法什么也没有

2.addTaskSetManager方法向rootPool中添加了TaskSetManager

private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
  extends SchedulableBuilder with Logging {

  override def buildPools() {
    // nothing
  }

  override def addTaskSetManager(manager: Schedulable, properties: Properties) {
    rootPool.addSchedulable(manager)
  }
}

FairSchedulableBuilder的实现如下:

FairSchedulableBuilder读取用户指定的参数spark.scheduler.allocation.file对应的文件,如果没有指定该文件,则默认加载默认路径的配置文件:$SPARK_HOME/CONF/fairscheduler.xml

一个实例fairscheduler.xml内容如下:

<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>

buildPools方法

用于创建调度池

1.调用buildFairSchedulerPool方法去获取文件输入流,解析XML配置文件,创建调度池

2.最后调用buildDefaultPool方法,创建默认池

 override def buildPools() {
    var is: Option[InputStream] = None
    try {
      is = Option {
        schedulerAllocFile.map { f =>
          new FileInputStream(f)
        }.getOrElse {
          Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
        }
      }

      is.foreach { i => buildFairSchedulerPool(i) }
    } finally {
      is.foreach(_.close())
    }

    buildDefaultPool()
  }

buildFairSchedulerPool方法

用于创建FAIR调度算法调度池

1.将文件输入流转换为XML

2.将XML中对应的配置项,转换成调度池中对应的属性

例如获取XML中pool name=”production”,将production作为调度池名称,以此类推,获取schedulingMode,minShare ,weight配置,根据获取到的配置,创建调度池,将刚创建的调度池添加到根调度池

  private def buildFairSchedulerPool(is: InputStream) {
    val xml = XML.load(is)
    for (poolNode <- (xml \\ POOLS_PROPERTY)) {

      val poolName = (poolNode \ POOL_NAME_PROPERTY).text
      var schedulingMode = DEFAULT_SCHEDULING_MODE
      var minShare = DEFAULT_MINIMUM_SHARE
      var weight = DEFAULT_WEIGHT

      val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
      if (xmlSchedulingMode != "") {
        try {
          schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
        } catch {
          case e: NoSuchElementException =>
            logWarning(s"Unsupported schedulingMode: $xmlSchedulingMode, " +
              s"using the default schedulingMode: $schedulingMode")
        }
      }

      val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
      if (xmlMinShare != "") {
        minShare = xmlMinShare.toInt
      }

      val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
      if (xmlWeight != "") {
        weight = xmlWeight.toInt
      }

      val pool = new Pool(poolName, schedulingMode, minShare, weight)
      rootPool.addSchedulable(pool)
      logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
        poolName, schedulingMode, minShare, weight))
    }
  }

buildDefaultPool方法:

用于创建默认调度池

1.判断根调度池中是否存在default Pool默认调度池

2.如果不存在,就创建

3.将默认调度池添加到根调度池

FairSchedulableBuilder中,预先设置好了默认的配置常量,默认为default调度池,默认调度模式为FIFO,默认MINIMUM_SHARE为0,默认WEIGHT为1

  val DEFAULT_POOL_NAME = "default"
  val DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO
  val DEFAULT_MINIMUM_SHARE = 0
  val DEFAULT_WEIGHT = 1
 val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
        DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
      rootPool.addSchedulable(pool)

addTaskSetManager方法:

用于向调度池中添加TaskSetManager

1.TaskSetManager的默认父调度池为默认调度池default Pool

2.如果TaskSet中properties参数不为空,获取properties中FAIR_SCHEDULER_PROPERTIES,DEFAULT_POOL_NAME配置,得到父调度池信息,将TaskSetManager父调度池修改为从properties获取到的调度池

3.如果properties中,没有获取到父调度池,则创建默认父调度池default Pool

4.将TaskSetManager添加到父调度池中

  override def addTaskSetManager(manager: Schedulable, properties: Properties) {
    var poolName = DEFAULT_POOL_NAME
    var parentPool = rootPool.getSchedulableByName(poolName)
    if (properties != null) {
      poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
      parentPool = rootPool.getSchedulableByName(poolName)
      if (parentPool == null) {
        parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
          DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
        rootPool.addSchedulable(parentPool)
        logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
          poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
      }
    }
    parentPool.addSchedulable(manager)
    logInfo("Added task set " + manager.name + " tasks to pool " + poolName)
  }

通过上面的阅读, 我们已经了解了TaskSchedulerImpl调度算法与调度池的创建流程

下篇文章继续阅读TaskSetManager,并对Task的提交进行部分进行收尾,我们下篇文章见~

亲,看完了点个赞呗!

赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

You may also like...

1 Response

  1. mira说道:

    博主辛苦了,给点个赞。

发表评论

邮箱地址不会被公开。