Spark源码阅读:TaskScheduler的创建和启动



系列导读

想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~

1.Spark Submit任务提交

2.Driver的注册与启动

3.SparkContext的初始化

4.TaskScheduler的创建和启动

5.Application的注册

6.Master注册机制-Application的注册与Application的调度算法

7.Spark源码阅读:Executor的启动

8.CoarseGrainedExecutorBackend的启动

9.DAGScheduler源码分析

10.DAGScheduler Stage划分与Task最佳位置计算

11.TaskScheduler调度算法与调度池

12.TaskScheduler任务提交与资源分配

13.Task的启动

14.Task的结果处理


上一盘文章介绍了SparkContext的初始化

其中,主要阅读TaskScheduler、DAGScheduler的创建和启动

TaskScheduler、DAGScheduler的创建和启动步骤有很多,每个步骤分开写

这篇文章简要看一下TaskScheduler的创建


相关代码如下:

SparkContext调用createTaskScheduler,传入参数为sparkContext,master,deployMode

返回一个元组,元组内元素为schedulerBackend,taskScheduler

 //创建TaskScheduler调度器
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // 在taskScheduler设置好DAGScheduler的引用后,开启TaskScheduler
    // 构造器
    _taskScheduler.start()

createTaskScheduler方法根据传入master进行模式匹配

我们提交程序指定的master为spark://192.168.1.20:7077

根据spark master进行正则进行匹配

/**
  * 从master字符串中提取信息的正则集合
  */
private object SparkMasterRegex {
  val SPARK_REGEX = """spark://(.*)""".r
}

根据master匹配结果,创建TaskSchedulerImpl对象,创建StandaloneSchedulerBackend对象

关于TaskSchedulerImpl,org.apache.spark.scheduler.TaskScheduler是一个trait,它的作用是为创建它的SparkContext调度任务,即从DAGScheduler接受不同Stage的任务,并且向集群提交这些任务,并对未执行、特别慢的任务启动备份任务。

TaskScheduler是以后实现多种任务调度器的基础,不过当前的org.apache.spark.scheduler.TaskSchedulerImpl是唯一实现。

接下来调用TaskSchedulerImpl的initialize初始化方法

case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)

TaskSchedulerImpl的initialize初始化方法

任务的最终调度实际上都是落实到接口Scheduler的具体实现上的

关于TaskSchedulerImpl的调度池和调度算法请查看:Spark源码阅读:TaskScheduler调度算法与调度池

TaskSchedulerImpl的调度模式有FAIR和FIFO两种

以默认的FIFO调度为例,TaskSchedulerImpl的初始化过程如下:

1)使TaskSchedulerImpl持有LocalBackend的引用

2)创建调度池rootPool,rootPool中缓存了调度队列、调度算法及TaskSetManager集合等信息

3)创建FIFOSchedulableBuilder,用来操作rootPool中的调度队列

def initialize(backend: SchedulerBackend) {
    this.backend = backend
    //创建调度池
    rootPool = new Pool("", schedulingMode, 0, 0)
    schedulableBuilder = {
      //  根据调度模式匹配
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
      }
    }
    schedulableBuilder.buildPools()
  }

初始化方法执行完毕后,将创建好的 (backend, scheduler)返回


SparkContext得到返回元组 (backend, scheduler)

 val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)

元组中sched,即schedulerBackend为StandaloneSchedulerBackend

_schedulerBackend = sched

元组中ts,即taskScheduler为TaskSchedulerImpl

_taskScheduler = ts

接下来创建DAGScheduler

DAGScheduler主要负责分析用户提交的应用,并根据计算任务的依赖关系建立DAG,然后将DAG划分为不同的Stage,其中每个Stage由可以执行的一组Task构成,这些Task的执行逻辑完全相同,只是作用于不同的数据。而且DAG在不同的资源管理框架下的实现是相同的。DAG将这组Task划分完成后,会将这组Task提交到TaskScheduler。TaskScheduler通过Cluster Manager在集群的某个Worker的Executor上启动任务。

DAGScheduler相关内容请查看:

Spark源码阅读:DAGScheduler源码分析

Spark源码阅读:DAGScheduler Stage划分与Task最佳位置计算

_dagScheduler = new DAGScheduler(this)

在taskScheduler设置好DAGScheduler的引用后,开启TaskScheduler

_taskScheduler.start()

TaskSchedulerImpl的start方法中调用了backend的start方法,即StandaloneSchedulerBackend的start方法

override def start() {
    //  执行backend的start方法
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          checkSpeculatableTasks()
        }
      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
    }
  }

来看StandaloneSchedulerBackend

org.apache.spark.scheduler.SchedulerBackend是一个trait,是TaskScheduler的调度后端接口

TaskScheduler给Task分配集群资源实际是通过SchedulerBackend完成的

SchedulerBackend向当前等待分配计算资源的Task分配资源,分配完后与分配Task的Executor进行通信,在该Executor上启动Task,进行计算

在standalone模式下,StandaloneSchedulerBackend继承自CoarseGrainedSchedulerBackendTaskScheduler Standalone Cluster Manager后端的实现

作用于Driver内,用于Standalone模式下资源管理器、与Executor通信,Task资源分配

StandaloneSchedulerBackend的start方法,调用父类CoarseGrainedSchedulerBackend的start方法

override def start() {
    //  调用CoarseGrainedSchedulerBackend的start方法
    super.start()

CoarseGrainedSchedulerBackend的start方法,调用createDriverEndpointRef,根据sparkConf创建DriverEndpoint

 override def start() {
    val properties = new ArrayBuffer[(String, String)]
    //  获取sparkConf,添加到ArrayBuffer中
    for ((key, value) <- scheduler.sc.conf.getAll) {
      if (key.startsWith("spark.")) {
        properties += ((key, value))
      }
    }  //  创建DriverEndpoint
    driverEndpoint = createDriverEndpointRef(properties)
  }

createDriverEndpointRef该方法获取Driver EndpointRef,但最开始的操作为向RpcEnv注册DriverEndpoint

 protected def createDriverEndpointRef(
      properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
    //  获取Driver EndpointRef
    rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
  }

Driver Endpoint向RpcEnv注册,创建了一个DriverEndpoint,DriverEndpoint用于提交task到Executor,接收Executor返回的计算结果

protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
    //  向RpcEnv注册DriverEndpoint
    new DriverEndpoint(rpcEnv, properties)
  }

StandaloneSchedulerBackend start主要作用是创建AppClient,用于注册spark application,关于application的注册,请访问Spark源码阅读:Application的注册

至此,整个TaskScheduler的创建和启动完成,我们下篇文章见~

亲,看完了点个赞呗!

赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

You may also like...

发表评论

电子邮件地址不会被公开。