Spark源码阅读:Task的启动
系列导读:
想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~
6.Master注册机制-Application的注册与Application的调度算法
8.CoarseGrainedExecutorBackend的启动
10.DAGScheduler Stage划分与Task最佳位置计算
13.Task的启动
14.Task的结果处理
上一篇文章介绍了taskScheduler的submitTasks方法用于提交task任务,方法中调用backend的reviveOffers方法为Task分配资源,分配好资源后,启动Task
(了解相关内容请戳:Spark源码阅读:TaskScheduler任务提交与资源分配)
VV将整个Task的提交流程总结如下图
今天就来阅读最后一步Task的启动的相关源码
在CoarseGrainedSchedulerBackend的makeOffers方法中有4个主要步骤
1.筛选掉死去的executors
2.将这个application所有可用的executor,将其封装为workOffers,每个workOffers代表了每个executor可用的cpu资源数量
3.调用scheduler的resourceOffers方法,将workOffers传入,实际调用的是TaskSchedulerImpl的resourceOffers方法
4.获取scheduler.resourceOffers的返回的已分配好资源的Task序列,调用launchTasks方法,启动task,下面去看launchTasks方法
private def makeOffers() { // 筛选掉死掉的executor val activeExecutors = executorDataMap.filterKeys(executorIsAlive) // 将这个application所有可用的executor,将其封装成了workOffers,每个workOffers代表了每个executor可用的cpu资源数量 val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toIndexedSeq // 调用scheduler.resourceOffers的方法,将workOffers传入 // 获取scheduler.resourceOffers的返回值,调用launchTasks方法 launchTasks(scheduler.resourceOffers(workOffers)) }
在launchTasks方法中:
1.将task序列调用flatten方法展开后遍历
2.每个executor将即将执行的task信息序列化
如果已经序列化的serializedTask buffer大于Rpc发送消息的最大值,将taskSetManager abort停止,建议调整maxRpcMessageSize
如果小于Rpc发送消息的最大值,则找到Task对应的executor,将当前executor空闲cpu资源数减去Task将要使用的cpu数
3.向executor发送LaunchTask消息,在executor上启动task
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { // 将每个executor要执行的task信息序列化 val serializedTask = ser.serialize(task) if (serializedTask.limit >= maxRpcMessageSize) { scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + "spark.rpc.message.maxSize or using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize) taskSetMgr.abort(msg) } catch { case e: Exception => logError("Exception in error callback", e) } } } else { // 找到对应的executor val executorData = executorDataMap(task.executorId) // 将当前executor空闲cpu资源数减去将要使用的cpu数 executorData.freeCores -= scheduler.CPUS_PER_TASK logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " + s"${executorData.executorHost}.") // 向executor发送LaunchTask消息,在executor上启动task executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) } } }
向executor发送消息,其实是向CoarseGrainedExecutorBackend发送消息
了解CoarseGrainedExecutorBackend相关内容请戳:Spark源码阅读:CoarseGrainedExecutorBackend的启动
CoarseGrainedExecutorBackend接收到LaunchTask消息后,做出如下处理:
1.首先判断消息是否为空,如果是空,则退出executor
2.如果不为空,将task seq反序列化,调用executor的launchTask方法,将反序列化得到的task信息传入
// 启动Task case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { // 反序列化task val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) // 执行executor的launchTask方法 executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask) }
来到org.apache.spark.executor.class
看看launchTask方法,在该方法中:
1.给每个Task都会创建一个TaskRunner,TaskRunner继承自Runneralbe,TaskRunner的run方法中,会运行task
2.将task添加到runningTasks集合中,标记为task正在运行
3.将TaskRunner被放到一个线程池中执行
def launchTask( context: ExecutorBackend, taskId: Long, attemptNumber: Int, taskName: String, serializedTask: ByteBuffer): Unit = { val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask) runningTasks.put(taskId, tr) threadPool.execute(tr) }
重点看TaskRunner的run方法,run方法中会一直对Task的状态进行监控,并向driver更新task的运行状态
1.首先进行一些准备工作:
(1)记录时间:反序列化系统时间,反序列化CPU时间,GC时间
(2)加载一些工具:JVM线程监控器,线程上下文加载器、实例化closureSerializer序列化器用于序列化task
(3)打印日志:正在运行task,如下所示
18/07/23 16:47:05 INFO Executor: Running task 89.0 in stage 0.0 (TID 89)
(4)打印日志后,调用CoarseGrainedExecutorBackend的statusUpdate方法,告诉drivertask状态更新:task状态为正在运行
2.反序列化task:
(1)反序列化task相关依赖
(2)检查task相关依赖,如果有遗漏或新增依赖,则更新依赖
(3)反序列化task
(4)为task设置TaskMemoryManager
(5)检查task是否被kill 了,如果这个task在反序列化之前被kill了,则退出,如果在反序列化之后被kill,则继续执行task
(6)调用mapOutputTracker的updateEpoch方法,更新epoch
3.task的运行:
(1)记录task真正启动的系统、cpu时间
(2)调用task.run运行task,获取task执行的返回值value
如果获取到返回值,即Task执行完成,下面要对task的结果进行处理
(3)task运行完毕,释放blockManager相关task的锁,释放task占用的内存
(4)记录task执行完成后的系统、cpu时间
(5)将task运行相关时间指标添加到metrics度量系统
4.task的结果处理:
(1)获取task计算的累加结果
(2)将task序列化结果和累加结果封装为DirectTaskResult对象(task直接结果集)
(3)将DirectTaskResult序列化
(4)获取DirectTaskResult集合大小,即task结果大小
(5)根据条件判断,获得最终的序列化结果
spark中,task默认的最大结果大小限制(maxResultSize)为1G,该大小由spark配置spark.driver.maxResultSize设置
如果task的结果大小大于1G,且task默认的最大限制大于0,则打印警报日志,result结果过大,删除该结果
spark中,task默认的最大直接结果大小限制(maxDirectResultSize)默认为1048576 bytes,该值由spark配置spark.task.maxDirectResultSize与spark.rpc.message.maxSize的最小值决定
如果taskd的结果大于最大直接结果大小限制,将task结果写到blockManager中,存储级别为MEMORY_AND_DISK_SER,并打印日志:task计算完成,task结果通过blockManager发送给driver
日志示例如下:
18/07/24 09:27:07 INFO executor.Executor: Finished task 0.0 in stage 3.0 (TID 4). 3097876 bytes result sent via BlockManager
如果resultSize小于最大直接结果大小限制,则打印日志:task计算完成,task结果直接发送给driver
日志示例如下:
18/07/23 16:47:05 INFO Executor: Finished task 89.0 in stage 0.0 (TID 89). 1041 bytes result sent to driver
5.调用CoarseGrainedExecutorBackend的statusUpdate更新状态方法,告诉drivertask状态更新:task状态为已完成
6.最后,移除runningTasks集合中已执行完的task
看过这部分源码之后,大家应该了解了在Spark运行过程中这些日志的出处了。
override def run(): Unit = { threadId = Thread.currentThread.getId // 设置线程名:Executor task launch worker for task XXX Thread.currentThread.setName(threadName) // 获取JVM中线程托管Bean val threadMXBean = ManagementFactory.getThreadMXBean // 创建TaskMemoryManager val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId) // 获取反序列化开始时间 val deserializeStartTime = System.currentTimeMillis() // 获取反序列化CPU时间 val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L // 获取线程上下文加载器 Thread.currentThread.setContextClassLoader(replClassLoader) // 实例task序列化器 val ser = env.closureSerializer.newInstance() // 打印日志:正在运行task logInfo(s"Running $taskName (TID $taskId)") // 调用CoarseGrainedExecutorBackend的statusUpdate方法,更新状态 execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) var taskStart: Long = 0 var taskStartCpu: Long = 0 // 获取gc时间 startGCTime = computeTotalGcTime() try { // 反序列化task依赖 val (taskFiles, taskJars, taskProps, taskBytes) = Task.deserializeWithDependencies(serializedTask) // 为executor将task完全反序列化 Executor.taskDeserializationProps.set(taskProps) // 如果遗漏依赖包,或者更新依赖包,通过updateDependencies方法更新依赖文件和jar包 updateDependencies(taskFiles, taskJars) // 反序列化获取task task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) // 为task设置本地参数 task.localProperties = taskProps // 为task设置TaskMemoryManager task.setTaskMemoryManager(taskMemoryManager) // 如果这个task在反序列化之前被kill了,则退出 // 否则,继续执行task if (killed) { throw new TaskKilledException } //调用mapOutputTracker的updateEpoch方法,更新epoch logDebug("Task " + taskId + "'s epoch is " + task.epoch) env.mapOutputTracker.updateEpoch(task.epoch) // 记录task启动系统时间 taskStart = System.currentTimeMillis() // 记录task启动cpu时间 taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L var threwException = true // 获取task计算的返回值 val value = try { // 调用task的run方法,启动task val res = task.run( taskAttemptId = taskId, attemptNumber = attemptNumber, metricsSystem = env.metricsSystem) threwException = false res } finally { // blockManager释放相关task的锁 val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId) // 释放task占用的内存 val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory() // 记录task结束系统时间 val taskFinish = System.currentTimeMillis() // 记录task结束cpu时间 val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L // If the task has been killed, let's fail it. if (task.killed) { throw new TaskKilledException } // 实例序列化对象 val resultSer = env.serializer.newInstance() // 序列化前系统时间 val beforeSerialization = System.currentTimeMillis() // 将task计算结果序列化 val valueBytes = resultSer.serialize(value) // 序列化后系统时间 val afterSerialization = System.currentTimeMillis() // 将task运行时间指标数据添加到metrics度量系统 task.metrics.setExecutorDeserializeTime( (taskStart - deserializeStartTime) + task.executorDeserializeTime) task.metrics.setExecutorDeserializeCpuTime( (taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime) // We need to subtract Task.run()'s deserialization time to avoid double-counting task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime) task.metrics.setExecutorCpuTime( (taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime) task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization) // 获取task的累加结果 val accumUpdates = task.collectAccumulatorUpdates() // 将task序列化结果和累加结果封装为DirectTaskResult(task直接结果集) val directResult = new DirectTaskResult(valueBytes, accumUpdates) // 将DirectTaskResult序列化 val serializedDirectResult = ser.serialize(directResult) // 获取task结果大小 val resultSize = serializedDirectResult.limit // 将serializedDirectResult(已经序列化的Task直接结果集)直接发送给driver val serializedResult: ByteBuffer = { // maxResultSize大小默认为1G // 如果最大结果大于0并且task结果大于最大结果,则打印警报日志,result结果过大 if (maxResultSize > 0 && resultSize > maxResultSize) { logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " + s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " + s"dropping it.") ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize)) // 如果task结果大于maxDirectResultSize } else if (resultSize > maxDirectResultSize) { // 获取blockId val blockId = TaskResultBlockId(taskId) // 将task直接结果写到blockManager,存储级别为MEMORY_AND_DISK_SER env.blockManager.putBytes( blockId, new ChunkedByteBuffer(serializedDirectResult.duplicate()), StorageLevel.MEMORY_AND_DISK_SER) logInfo( s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)") // 序列化在BlockManager中directTaskResult的引用 ser.serialize(new IndirectTaskResult[Any](blockId, resultSize)) } else { // 如果resultSize都不满足上述条件,返回serializedDirectResult logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver") serializedDirectResult } } // 调用CoarseGrainedExecutorBackend的statusUpdate更新状态方法,将task状态更新为已完成 execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) } 省略异常处理代码...... } finally { // 移除runningTasks集合中已执行完的task runningTasks.remove(taskId) } } }
最后,我们来看一下,调用CoarseGrainedExecutorBackend的statusUpdate更新状态方法,对task执行结果的处理
statusUpdate方法中,向driver发送消息StatusUpdate,task状态更新
消息内容:executor id,task id,task状态,task结果数据
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { val msg = StatusUpdate(executorId, taskId, state, data) driver match { case Some(driverRef) => driverRef.send(msg) case None => logWarning(s"Drop $msg because has not yet connected to driver") } }
driver端,即CoarseGrainedSchedulerBackend,接收到消息后
做出如下响应:
1.调用scheduler的statusUpdate方法,这里的scheduler为TaskSchedulerImpl,即调用TaskSchedulerImpl的statusUpdate方法
2.如果task的状态为isFinished,将executor 空闲cpu信息更新,调用makeOffers将资源释放
如果不是,则打印警告日志,无视该状态更新
override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) => executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}
至此,整个Task的启动,阅读完毕。
关于TaskSchedulerImpl端对Task执行结果的处理,我们下篇文章再写
亲,看完了点个赞呀!!!