Spark源码阅读:Task的结果处理
系列导读:
想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~
6.Master注册机制-Application的注册与Application的调度算法
8.CoarseGrainedExecutorBackend的启动
10.DAGScheduler Stage划分与Task最佳位置计算
13.Task的启动
14.Task的结果处理
在Task运行完成后,向drvier发送消息,Task计算完毕,更新Task状态为已完成,接下来要对Task计算的结果进行处理,今天就来看一下Task计算结果的处理
这篇文章也是Spark应用运行系列文章的最后一篇
了解Task计算相关内容,请戳:Spark源码阅读:Task的启动
driver端,即CoarseGrainedSchedulerBackend,接收到Task完成消息后,做出如下响应:
1.调用scheduler的statusUpdate方法,这里的scheduler为TaskSchedulerImpl,即调用TaskSchedulerImpl的statusUpdate方法
2.如果task的状态为isFinished,将executor 空闲cpu信息更新,调用makeOffers将资源释放
如果不是,则打印警告日志,无视该状态更新
override def receive: PartialFunction[Any, Unit] = { case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.freeCores += scheduler.CPUS_PER_TASK makeOffers(executorId) case None => logWarning(s"Ignored task status update ($taskId state $state) " + s"from unknown executor with ID $executorId") } }
实则调用调用TaskSchedulerImpl的statusUpdate方法,对Task状态进行更新
该方法执行步骤如下:
1.从taskIdToTaskSetManager获取task对应的TaskSetManager
2.如果task状态是LOST,从taskIdToTaskSetManager获取task对应的exeuctor标识,如果此executor正在运行task,则移除该executor,原因是executor lost
3.如果task状态是isFinished,先调用cleanupTaskState清理此task相关缓存数据,然后调用taskResultGetter的enqueueSuccessfulTask方法,对执行成功的Task结果进行处理
如果是task状态是FAILED/KILLED/LOST则调用taskResultGetter的enqueueFailedTask方法,对执行失败的Task结果进行处理
4.如果executor的状态已经被定义,说明该executor已经被移除,则移除该executor
// 更新状态 def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { var failedExecutor: Option[String] = None var reason: Option[ExecutorLossReason] = None synchronized { try { // 从taskIdToTaskSetManager获取task对应的TaskSetManager taskIdToTaskSetManager.get(tid) match { case Some(taskSet) => // 如果task状态是LOST if (state == TaskState.LOST) { val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException( "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)")) if (executorIdToRunningTaskIds.contains(execId)) { reason = Some( SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) // 移除executor removeExecutor(execId, reason.get) failedExecutor = Some(execId) } } // 如果状态是isFinished if (TaskState.isFinished(state)) { // 清理task cleanupTaskState(tid) // 移除taskSet中已完成的task taskSet.removeRunningTask(tid) if (state == TaskState.FINISHED) { // 调用taskResultGetter的enqueueSuccessfulTask方法,获取成功执行的结果 taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) } } case None => logError( ("Ignoring update with state %s for TID %s because its task set is gone (this is " + "likely the result of receiving duplicate task finished status updates) or its " + "executor has been marked as failed.") .format(state, tid)) } } catch { case e: Exception => logError("Exception in statusUpdate", e) } } if (failedExecutor.isDefined) { assert(reason.isDefined) dagScheduler.executorLost(failedExecutor.get, reason.get) backend.reviveOffers() } }
在statusUpdate的方法中,最重要的步骤是:第三步,调用taskResultGetter的enqueueSuccessfulTask方法,对执行成功的Task结果进行处理
下面对该方法进行阅读:
enqueueSuccessfulTask方法步骤如下:
1.对task结果进行反序列化,对task结果类型进行模式匹配
2.如果task结果类型为directResult,则结果数据存放在DirectTaskResult中,对DirectTaskResult中的数据进行反序列化
如果task的结果类型为IndirectTaskResult,则结果数据存放在blockManager中,调用blockManager的getRemoteBytes方法拉取数据,将数据反序列化
3.更新task的执行结果的累加器中的结果大小
4.调用TaskSchedulerImpl的handleSuccessfulTask方法,用于处理成功的task
def enqueueSuccessfulTask( taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer): Unit = { getTaskResultExecutor.execute(new Runnable { override def run(): Unit = Utils.logUncaughtExceptions { try { val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match { // 如果结果类型为directResult,说明结果存在DirectTaskResult中 case directResult: DirectTaskResult[_] => if (!taskSetManager.canFetchMoreResults(serializedData.limit())) { return } // 获取直接结果 directResult.value(taskResultSerializer.get()) (directResult, serializedData.limit()) // 如果结果类型为IndirectTaskResult,说明结果存在blockManager中 case IndirectTaskResult(blockId, size) => if (!taskSetManager.canFetchMoreResults(size)) { // dropped by executor if size is larger than maxResultSize sparkEnv.blockManager.master.removeBlock(blockId) return } logDebug("Fetching indirect task result for TID %s".format(tid)) scheduler.handleTaskGettingResult(taskSetManager, tid) // 调用blockManager的getRemoteBytes方法拉取数据,将数据反序列化 val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId) if (!serializedTaskResult.isDefined) { scheduler.handleFailedTask( taskSetManager, tid, TaskState.FINISHED, TaskResultLost) return } val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]]( serializedTaskResult.get.toByteBuffer) // force deserialization of referenced value deserializedResult.value(taskResultSerializer.get()) sparkEnv.blockManager.master.removeBlock(blockId) (deserializedResult, size) } // 更新task的执行结果的累加器中的结果大小 result.accumUpdates = result.accumUpdates.map { a => if (a.name == Some(InternalAccumulator.RESULT_SIZE)) { val acc = a.asInstanceOf[LongAccumulator] assert(acc.sum == 0L, "task result size should not have been set on the executors") acc.setValue(size.toLong) acc } else { a } } // 调用TaskSchedulerImpl的handleSuccessfulTask方法,处理成功的task scheduler.handleSuccessfulTask(taskSetManager, tid, result) } catch { case cnf: ClassNotFoundException => val loader = Thread.currentThread.getContextClassLoader taskSetManager.abort("ClassNotFound with classloader: " + loader) case NonFatal(ex) => logError("Exception while getting task result", ex) taskSetManager.abort("Exception while getting task result: %s".format(ex)) } } }) }
下面去看TaskSchedulerImpl的handleSuccessfulTask方法:
实则是调用了taskSetManager的handleSuccessfulTask方法
def handleSuccessfulTask( taskSetManager: TaskSetManager, tid: Long, taskResult: DirectTaskResult[_]): Unit = synchronized { taskSetManager.handleSuccessfulTask(tid, taskResult) }
关于taskSetManager的handleSuccessfulTask方法,步骤如下:
1.将taskInfos中缓存的task info标记为FINISHED
2.将task id从RunningTask集合中移除
3.调用dagScheduler的taskEnded方法,报告任务完成
4.同一个Task可能会有多种尝试运行,如果尝试成功后,其他方式的尝试都是没必要的,所以杀死其他方式尝试
5.如果successful集合中没有被存在该task成功的状态,则为运行成功的task数 +1,打印日志:task完成,并将successful集合中,该task状态更新为true,即成功
日志实例如下:
8/07/23 17:14:14 INFO TaskSetManager: Finished task 98.0 in stage 0.0 (TID 98) in 13 ms on 192.168.1.40 (executor 0) (99/100)
6.最后调用maybeFinishTaskSet,对可能完成的TaskSet进行清理
最后将整个Task的运行结果处理总结如下图:
至此,从Spark submit提交程序,再到对Task运行结果处理,我们整个Spark 应用程序的运行过程源码我们就阅读完毕了
完结,撒花!!
Spark源码系列未完待续,敬请期待。
看完了点个赞呗!