Spark源码阅读:TaskScheduler的创建和启动
系列导读:
想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~
6.Master注册机制-Application的注册与Application的调度算法
8.CoarseGrainedExecutorBackend的启动
10.DAGScheduler Stage划分与Task最佳位置计算
13.Task的启动
14.Task的结果处理
上一盘文章介绍了SparkContext的初始化
其中,主要阅读TaskScheduler、DAGScheduler的创建和启动
TaskScheduler、DAGScheduler的创建和启动步骤有很多,每个步骤分开写
这篇文章简要看一下TaskScheduler的创建
相关代码如下:
SparkContext调用createTaskScheduler,传入参数为sparkContext,master,deployMode
返回一个元组,元组内元素为schedulerBackend,taskScheduler
//创建TaskScheduler调度器 val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) // 在taskScheduler设置好DAGScheduler的引用后,开启TaskScheduler // 构造器 _taskScheduler.start()
createTaskScheduler方法根据传入master进行模式匹配
我们提交程序指定的master为spark://192.168.1.20:7077
根据spark master进行正则进行匹配
/** * 从master字符串中提取信息的正则集合 */ private object SparkMasterRegex { val SPARK_REGEX = """spark://(.*)""".r }
根据master匹配结果,创建TaskSchedulerImpl对象,创建StandaloneSchedulerBackend对象
关于TaskSchedulerImpl,org.apache.spark.scheduler.TaskScheduler是一个trait,它的作用是为创建它的SparkContext调度任务,即从DAGScheduler接受不同Stage的任务,并且向集群提交这些任务,并对未执行、特别慢的任务启动备份任务。
TaskScheduler是以后实现多种任务调度器的基础,不过当前的org.apache.spark.scheduler.TaskSchedulerImpl是唯一实现。
接下来调用TaskSchedulerImpl的initialize初始化方法
case SPARK_REGEX(sparkUrl) => val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) (backend, scheduler)
TaskSchedulerImpl的initialize初始化方法
任务的最终调度实际上都是落实到接口Scheduler的具体实现上的
关于TaskSchedulerImpl的调度池和调度算法请查看:Spark源码阅读:TaskScheduler调度算法与调度池
TaskSchedulerImpl的调度模式有FAIR和FIFO两种
以默认的FIFO调度为例,TaskSchedulerImpl的初始化过程如下:
1)使TaskSchedulerImpl持有LocalBackend的引用
2)创建调度池rootPool,rootPool中缓存了调度队列、调度算法及TaskSetManager集合等信息
3)创建FIFOSchedulableBuilder,用来操作rootPool中的调度队列
def initialize(backend: SchedulerBackend) { this.backend = backend //创建调度池 rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { // 根据调度模式匹配 schedulingMode match { case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf) case _ => throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode") } } schedulableBuilder.buildPools() }
初始化方法执行完毕后,将创建好的 (backend, scheduler)返回
SparkContext得到返回元组 (backend, scheduler)
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
元组中sched,即schedulerBackend为StandaloneSchedulerBackend
_schedulerBackend = sched
元组中ts,即taskScheduler为TaskSchedulerImpl
_taskScheduler = ts
接下来创建DAGScheduler
DAGScheduler主要负责分析用户提交的应用,并根据计算任务的依赖关系建立DAG,然后将DAG划分为不同的Stage,其中每个Stage由可以执行的一组Task构成,这些Task的执行逻辑完全相同,只是作用于不同的数据。而且DAG在不同的资源管理框架下的实现是相同的。DAG将这组Task划分完成后,会将这组Task提交到TaskScheduler。TaskScheduler通过Cluster Manager在集群的某个Worker的Executor上启动任务。
DAGScheduler相关内容请查看:
Spark源码阅读:DAGScheduler Stage划分与Task最佳位置计算
_dagScheduler = new DAGScheduler(this)
在taskScheduler设置好DAGScheduler的引用后,开启TaskScheduler
_taskScheduler.start()
TaskSchedulerImpl的start方法中调用了backend的start方法,即StandaloneSchedulerBackend的start方法
override def start() { // 执行backend的start方法 backend.start() if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") speculationScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() } }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS) } }
来看StandaloneSchedulerBackend
org.apache.spark.scheduler.SchedulerBackend是一个trait,是TaskScheduler的调度后端接口
TaskScheduler给Task分配集群资源实际是通过SchedulerBackend完成的
SchedulerBackend向当前等待分配计算资源的Task分配资源,分配完后与分配Task的Executor进行通信,在该Executor上启动Task,进行计算
在standalone模式下,StandaloneSchedulerBackend继承自CoarseGrainedSchedulerBackend是TaskScheduler Standalone Cluster Manager后端的实现
作用于Driver内,用于Standalone模式下资源管理器、与Executor通信,Task资源分配
StandaloneSchedulerBackend的start方法,调用父类CoarseGrainedSchedulerBackend的start方法
override def start() { // 调用CoarseGrainedSchedulerBackend的start方法 super.start()
CoarseGrainedSchedulerBackend的start方法,调用createDriverEndpointRef,根据sparkConf创建DriverEndpoint
override def start() { val properties = new ArrayBuffer[(String, String)] // 获取sparkConf,添加到ArrayBuffer中 for ((key, value) <- scheduler.sc.conf.getAll) { if (key.startsWith("spark.")) { properties += ((key, value)) } } // 创建DriverEndpoint driverEndpoint = createDriverEndpointRef(properties) }
createDriverEndpointRef该方法获取Driver EndpointRef,但最开始的操作为向RpcEnv注册DriverEndpoint
protected def createDriverEndpointRef( properties: ArrayBuffer[(String, String)]): RpcEndpointRef = { // 获取Driver EndpointRef rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties)) }
Driver Endpoint向RpcEnv注册,创建了一个DriverEndpoint,DriverEndpoint用于提交task到Executor,接收Executor返回的计算结果
protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { // 向RpcEnv注册DriverEndpoint new DriverEndpoint(rpcEnv, properties) }
StandaloneSchedulerBackend start主要作用是创建AppClient,用于注册spark application,关于application的注册,请访问:Spark源码阅读:Application的注册
至此,整个TaskScheduler的创建和启动完成,我们下篇文章见~
亲,看完了点个赞呗!