Spark源码阅读:Application的注册与Application的调度算法
系列导读:
想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~
6.Master注册机制-Application的注册与Application的调度算法
8.CoarseGrainedExecutorBackend的启动
10.DAGScheduler Stage划分与Task最佳位置计算
13.Task的启动
14.Task的结果处理
StandaloneAppClient向Master发送注册application消息后
masterRef.send(RegisterApplication(appDescription, self))
进入org.apache.spark.deploy.master,查看Master处理Application注册的源码
Master接收到消息后,做出如下操作
1.判断Master状态
2.传入applicationDescription和driver,调用createApplication方法,创建application,得到封装的ApplicationInfo,即app
3.将app传入,注册application,将app加入内存缓存,将app加入等待队列
4.持久化引擎将app持久化
5.driver向StandaloneAppClient发送消息,application已注册,即RegisteredApplication
6.最后调用scheduler方法,安排调度
/** * 处理Application注册的请求 */ case RegisterApplication(description, driver) => // 防止一些Driver的重复注册 // 如果master的状态是standby时,注册Application,无视该注册,没有反馈 if (state == RecoveryState.STANDBY) { } else { logInfo("Registering app " + description.name) // 传入description和driver,去创建Application,得到一个封装好的ApplicationInfo val app = createApplication(description, driver) // 将封装好的ApplicationInfo传入,注册Application // 将app加入内存缓存,将app加入等待队列 registerApplication(app) logInfo("Registered app " + description.name + " with ID " + app.id) // 使用持久化引擎将app持久化 persistenceEngine.addApplication(app) // driver发送消息,向StandaloneSchedulerBackend的StandaloneAppClient发送消息RegisteredApplication,已注册Application driver.send(RegisteredApplication(app.id, self)) // 然后调用schedule方法,去进行调度 schedule() // 至此,Application注册完成 }
从createApplication开始,该方法根据将当前时间戳,ApplicationDescription,app id,日期,driver,默认cpu core数信息封装成了ApplicationInfo,并返回
// 创建Application,返回封装好的ApplicationInfo private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef): ApplicationInfo = { // 获取当前系统时间 val now = System.currentTimeMillis() // 根据时间戳生成日期 val date = new Date(now) // 根据实际生成application id val appId = newApplicationId(date) // 将时间戳,ApplicationDescription,app id,日期,driver,默认cpu core数信息封装成了ApplicationInfo,并返回 new ApplicationInfo(now, appId, desc, date, driver, defaultCores) }
接收返回的ApplicationInfo,即app,接下来注册application调用registerApplication
1.首先判断driver地址是否存在,避免重复注册application
2.将app添加到apps HashSet中
3.将app id添加到idToApp HashMap中
4.将app driver添加到endpointToApp HashMap中
5.将app rpc地址添加到addressToApp HashMap中
6.将app添加到等待执行waitingApps ArrayBuffer中
// 注册application private def registerApplication(app: ApplicationInfo): Unit = { // 获取driver地址 val appAddress = app.driver.address // 判断driver地址是否存在,避免重复注册 if (addressToApp.contains(appAddress)) { logInfo("Attempted to re-register application at same address: " + appAddress) return } applicationMetricsSystem.registerSource(app.appSource) // 将app加入HashSet中 apps += app // 将app id添加到idToApp HashMap中 idToApp(app.id) = app // 将app driver添加到endpointToApp HashMap中 endpointToApp(app.driver) = app // 将app rpc地址添加到addressToApp HashMap中 addressToApp(appAddress) = app // 将app添加到等待执行waitingApps队列中 waitingApps += app if (reverseProxy) { webUi.addProxyTargets(app.id, app.desc.appUiUrl) } }
接下来持久化引擎将app持久化
driver向StandaloneAppClient发送消息,application已注册,即RegisteredApplication
重点为最后调用的schedule( )方法
来看scheduler方法,前文driver注册与启动阅读过相关driver源码,略过driver部分
1.首先判断master的状态
2.调用startExecutorsOnWorkers方法
/** * 在等待的应用程序中分配当前可用的资源 * 每当一个新的应用程序加入或资源可用性改变时,这个方法将被调用。 */ private def schedule(): Unit = { // 先来判断,当前master的状态,如果不是Alive,什么都不做 if (state != RecoveryState.ALIVE) { return } // 调用startExecutorsOnWorkers方法 startExecutorsOnWorkers() }
startExecutorsOnWorkers
先对worker资源进行筛选,cpu core数多的worker排在前面,将可用的worker添加到可用的usableWorkers集合中
调用scheduleExecutorsOnWorkers,为Application分配要在Worker上启动的Executor运行的Cpu Core数
分配好了executor 的core数
最后调用allocateWorkerResourceToExecutors,按照分配规则,分配worker资源给execut
// 在worker上调度和启动executor private def startExecutorsOnWorkers(): Unit = { // Spark默认为application启动Executor方式是FIFO调度,所有的application都是放在调度等待队列中 // 判断app剩余core数是否大于0,如果大于,遍历waitingApps for (app <- waitingApps if app.coresLeft > 0) { // 每个executor的cpu core val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor // 获取可用的worker,过滤出Alive Worker,且该worker的资源大于app对每个executor所需资源,并根据CPU数进行排序,CPU数空闲多的排在前面 val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= coresPerExecutor.getOrElse(1)) .sortBy(_.coresFree).reverse // 分配core数,给worker分配executor val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) // 已经决定了每个Worker分配了多少个cores,开始分配 for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { allocateWorkerResourceToExecutors( app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) } } }
关于scheduleExecutorsOnWorkers,该方法为Application分配要在Worker上启动的Executor返回一个数组,包括每个worker分配到的core数
这里就要说一下重点的Spark Application调度算法了
该调度算法为已注册的Application(但没有完成调度的),调度Worker节点上的Executor
有两种调度算法:
第一种:Spread Out
将一个Application的Executor Core数尽可能分配到更多的Worker上,甚至分配集群上所有的Worker上,这种方式往往会带来潜在的更好的数据本地性
第二种:非Spread Out
尝试将worker节点的空余的Cores都用尽了,才会去下一个worker节点分配,第二种方法是默认的,默认情况下,每个Worker只能启动一个Executor
为了更好的理解,举个例子
现有5个Worker,每个节点剩余资源如下,根据Core排序:
剩余Core | 剩余内存 | |
Worker1 | 7 | 5G |
Worker2 | 6 | 3G |
Worker3 | 3 | 4G |
Worker4 | 2 | 300M |
Worker5 | 1 | 2G |
现有一个用户提交application,该application最多需要12个core,每个executor 内存为500M,每个executor core数为2
在StartExecutorsOnWorkers方法中先对worker进行筛选,Worker4 不满足每个executor内存需求,Worker 5不满足每个executor core需求
将筛选出来的Worker1、Worker2、Worker3添加到usableWorkers中,下面就要对资源进行调度
Spread Out
第一次调度后,剩余资源如下
剩余Core | 剩余内存 | 分配的Core | 分配的内存 | |
Worker1 | 5 | 4.5G | 2 | 500M |
Worker2 | 6 | 3G | 0 | 0 |
Worker3 | 3 | 4G | 0 | 0 |
如上,可以发现只有Worker 1的可用资源变少了,Worker 2,3没有变化,因为第一次调度时,只在Worker 1上分配了一个executor,这个executor占用2个core,500M内存
接下来去Worker 2上去分配,分配后剩余资源如下
剩余Core | 剩余内存 | 分配的Core | 分配的内存 | |
Worker1 | 5 | 4.5G | 2 | 500M |
Worker2 | 4 | 2.5G | 2 | 500M |
Worker3 | 3 | 4G | 0 | 0 |
然后去Worker 3上分配
剩余Core | 剩余内存 | 分配的Core | 分配的内存 | |
Worker1 | 5 | 4.5G | 2 | 500M |
Worker2 | 4 | 2.5G | 2 | 500M |
Worker3 | 1 | 3.5G | 2 | 500M |
现在已经分配了3个executor,6个core,1.5G内存,但是资源还没有分配完,接着进行下一轮分配
还是先给Worker 1进行分配
剩余Core | 剩余内存 | 分配的Core | 分配的内存 | |
Worker1 | 3 | 4G | 4 | 1G |
Worker2 | 4 | 2.5G | 2 | 500M |
Worker3 | 1 | 3.5G | 2 | 500M |
给Worker 2分配
剩余Core | 剩余内存 | 分配的Core | 分配的内存 | |
Worker1 | 3 | 4G | 4 | 1G |
Worker2 | 2 | 2G | 4 | 1G |
Worker3 | 1 | 3.5G | 2 | 500M |
给Worker 3进行分配的时候,发现Worker 3的资源已经不够了,所以,又来了一轮,再给Worker 1进行分配
剩余Core | 剩余内存 | 分配的Core | 分配的内存 | |
Worker1 | 1 | 3.5G | 6 | 1.5M |
Worker2 | 4 | 2.5G | 4 | 1G |
Worker3 | 1 | 3.5G | 2 | 500M |
现在12个core全部分配,资源分配完毕,共启动了6个executor,worker 1上有3个executor,worker 2上有2个executor,worker 3上有1个executor
非Spread Out 模式
给Worker 1进行分配资源
剩余Core | 剩余内存 | 分配的Core | 分配的内存 | |
Worker1 | 5 | 4.5G | 2 | 500M |
Worker2 | 6 | 3G | 0 | 0 |
Worker3 | 3 | 4G | 0 | 0 |
由于Worker 1上还有资源可以分配,所以继续给Worker 1分配
剩余Core | 剩余内存 | 分配的Core | 分配的内存 | |
Worker1 | 3 | 4G | 4 | 1G |
Worker2 | 6 | 3G | ||
Worker3 | 3 | 4G |
Worker 1上仍然有资源可以分配,继续给Worker 1分配
剩余Core | 剩余内存 | 分配的Core | 分配的内存 | |
Worker1 | 1 | 3.5G | 6 | 1.5G |
Worker2 | 6 | 3G | ||
Worker3 | 3 | 4G |
分配3次之后,Worker 1资源已经不够用了,这里还有6个core没有分配,去Worker 2上进行分配,以此类推,直到分配的资源满足application,停止分配,最后Worker资源如下
剩余Core | 剩余内存 | 分配的Core | 分配的内存 | |
Worker1 | 1 | 3.5G | 6 | 1.5G |
Worker2 | 0 | 1.5G | 6 | 1.5G |
Worker3 | 3 | 4G |
栗子举完了,下面来看源码,会更好理解
1.指定每个executor需要的资源
2.通过指定的资源去找符合要求的worker,其中canLaunchExecutor,就是其中找符合要求的过程,即worker的资源是否满足启动executor条件,得到一个满足条件的worker列表
3.再根据调度模式,遍历满足条件的worker列表
如果是Spread Out,给第一个得到的worker启动executor,然后结束分配资源,去下一个Worker继续进行分配executor,然后去下一个worker进行分配,继续去下一个worker分配,直到满足了application的需求
如果是非Spread Out模式,则一直在这个worker上启动executor,直到这个worker资源分配尽,如果还没有满足application的需求,则去下一个worker上一直分配,以此类推,直到满足application的需求
4.最后返回给每个worker分配好的core数组
private def scheduleExecutorsOnWorkers( app: ApplicationInfo, usableWorkers: Array[WorkerInfo], spreadOutApps: Boolean): Array[Int] = { // 每个Executor的core数 val coresPerExecutor = app.desc.coresPerExecutor // 每个Executor最小分配core数 val minCoresPerExecutor = coresPerExecutor.getOrElse(1) // 每个worker一个executor,布尔类型 val oneExecutorPerWorker = coresPerExecutor.isEmpty // 每个Executor的内存 val memoryPerExecutor = app.desc.memoryPerExecutorMB // numUsable array长度 val numUsable = usableWorkers.length // Worker上已经分配的core数 val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker // Worker上已经分配的executor数量 val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker // app实际会分配的Cpu的数量,取app剩余的还没有被分配的CPU的数量和可以使用的workers的cpu数量总和之间的最小值 var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) // 是否可以在Worker上分配Executor def canLaunchExecutor(pos: Int): Boolean = { // 判断是否能一直调度,需要分配的core大于等于executor最小分配core数 val keepScheduling = coresToAssign >= minCoresPerExecutor // 判断worker是否还有足够的core资源 val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor // 如果允许worker为同一个application启动多个executor,或者在worker上没有启动过这个application的executor,那么就会创建一个executor // 如果在这个worker上已经有executor,则给这个executor更多的cpu core val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0 if (launchingNewExecutor) { // 分配的内存总量 = 分配的Executor数 * 每个executor分配的内存 val assignedMemory = assignedExecutors(pos) * memoryPerExecutor // 判断是否有足够的内存 // 可用的worker剩余内存 - 分配的内存总量 >= 每个executor分配的内存 val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor // 判断是否满足application需求 val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit keepScheduling && enoughCores && enoughMemory && underLimit } else { // oneExecutorPerWorker机制下,不检测内存和executor限制,很重要 keepScheduling && enoughCores } } // 通过canLaunchExecutor判断是否能启动executor // 如果能,则一直启动executor,直到该Worker资源不足,或满足了Application需求 var freeWorkers = (0 until numUsable).filter(canLaunchExecutor) // 开始在worker上分配 while (freeWorkers.nonEmpty) { // 遍历freeWorkers freeWorkers.foreach { pos => var keepScheduling = true // 再次判断是否能启动executor while (keepScheduling && canLaunchExecutor(pos)) { // 更新已经分配的core数 coresToAssign -= minCoresPerExecutor assignedCores(pos) += minCoresPerExecutor // 如果只允许每个worker上启动一个executor,那么worker分配的executor数量为1 // 如果允许worker启动多个executor,就是在原来的executor上加1 if (oneExecutorPerWorker) { assignedExecutors(pos) = 1 } else { assignedExecutors(pos) += 1 } // Spreading out Application方式意味着在尽可能多的Worker中分配executor。 // 如果不使用Spreading out算法,就在一个worker上持续分配executor,直到用完worker的资源 // 如果使用Spreading Out算法,就去下一个worker上分配executor // 如果使用Spreading Out算法,将keepScheduling修改为false // 不需要在这个worker上继续进行分配,去下一个worker上分配 if (spreadOutApps) { keepScheduling = false } } } // 过滤不满足条件的worker freeWorkers = freeWorkers.filter(canLaunchExecutor) } // 返回每个worker应该被分配的core数 assignedCores }
scheduleExecutorsOnWorkers分配好资源后,返回分配的数组,接下来就是对每个worker按照分配的数组进行分配资源
调用allocateWorkerResourceToExecutors给每个worker进行分配
allocateWorkerResourceToExecutors方法中
1.计算这个worker需要分配executor数量
2.获取worker上每一次为executor需要分配的cores的数量
3.根据需要分配的executor数量,调用app.addExecutor为application添加executor信息
4.调用launchExecutor,启动executor
5.将application的状态修改为Running
private def allocateWorkerResourceToExecutors( app: ApplicationInfo, assignedCores: Int, coresPerExecutor: Option[Int], worker: WorkerInfo): Unit = { // 这个worker需要分配executor数量 val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1) // 这个worker上每一次为executor需要分配的cores的数量 val coresToAssign = coresPerExecutor.getOrElse(assignedCores) for (i <- 1 to numExecutors) { // 调用addExecutor方法,为application添加executor val exec = app.addExecutor(worker, coresToAssign) // 启动executor launchExecutor(worker, exec) // 将application的状态改为Running app.state = ApplicationState.RUNNING } }
至此,Master处理application的注册和application的调度算法看完了。
下一篇文章,继续阅读launchExecutor,查看exeuctor的启动。
下篇文章见~