Spark源码阅读:Task的结果处理


系列导读

想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~

1.Spark Submit任务提交

2.Driver的注册与启动

3.SparkContext的初始化

4.TaskScheduler的创建和启动

5.Application的注册

6.Master注册机制-Application的注册与Application的调度算法

7.Spark源码阅读:Executor的启动

8.CoarseGrainedExecutorBackend的启动

9.DAGScheduler源码分析

10.DAGScheduler Stage划分与Task最佳位置计算

11.TaskScheduler调度算法与调度池

12.TaskScheduler任务提交与资源分配

13.Task的启动

14.Task的结果处理


在Task运行完成后,向drvier发送消息,Task计算完毕,更新Task状态为已完成,接下来要对Task计算的结果进行处理,今天就来看一下Task计算结果的处理

这篇文章也是Spark应用运行系列文章的最后一篇

了解Task计算相关内容,请戳:Spark源码阅读:Task的启动


driver端,即CoarseGrainedSchedulerBackend,接收到Task完成消息后,做出如下响应:

1.调用scheduler的statusUpdate方法,这里的scheduler为TaskSchedulerImpl,即调用TaskSchedulerImpl的statusUpdate方法

2.如果task的状态为isFinished,将executor 空闲cpu信息更新,调用makeOffers将资源释放

  如果不是,则打印警告日志,无视该状态更新

 override def receive: PartialFunction[Any, Unit] = { case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.freeCores += scheduler.CPUS_PER_TASK
              makeOffers(executorId) case None => logWarning(s"Ignored task status update ($taskId state $state) " + s"from unknown executor with ID $executorId") } }

实则调用调用TaskSchedulerImpl的statusUpdate方法,对Task状态进行更新

该方法执行步骤如下:

1.从taskIdToTaskSetManager获取task对应的TaskSetManager

2.如果task状态是LOST,从taskIdToTaskSetManager获取task对应的exeuctor标识,如果此executor正在运行task,则移除该executor,原因是executor lost

3.如果task状态是isFinished,先调用cleanupTaskState清理此task相关缓存数据,然后调用taskResultGetter的enqueueSuccessfulTask方法,对执行成功的Task结果进行处理

    如果是task状态是FAILED/KILLED/LOST则调用taskResultGetter的enqueueFailedTask方法,对执行失败的Task结果进行处理

4.如果executor的状态已经被定义,说明该executor已经被移除,则移除该executor

  //  更新状态
  def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
    var failedExecutor: Option[String] = None
    var reason: Option[ExecutorLossReason] = None
    synchronized {
      try {
        //  从taskIdToTaskSetManager获取task对应的TaskSetManager
        taskIdToTaskSetManager.get(tid) match {
          case Some(taskSet) =>
            //  如果task状态是LOST
            if (state == TaskState.LOST) {
              val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
                "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))
              if (executorIdToRunningTaskIds.contains(execId)) {
                reason = Some(
                  SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
                //  移除executor
                removeExecutor(execId, reason.get)
                failedExecutor = Some(execId)
              }
            }
            //  如果状态是isFinished
            if (TaskState.isFinished(state)) {
              //  清理task
              cleanupTaskState(tid)
              //  移除taskSet中已完成的task
              taskSet.removeRunningTask(tid)
              if (state == TaskState.FINISHED) {
                //  调用taskResultGetter的enqueueSuccessfulTask方法,获取成功执行的结果
                taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
              } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
                taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
              }
            }
          case None =>
            logError(
              ("Ignoring update with state %s for TID %s because its task set is gone (this is " +
                "likely the result of receiving duplicate task finished status updates) or its " +
                "executor has been marked as failed.")
                .format(state, tid))
        }
      } catch {
        case e: Exception => logError("Exception in statusUpdate", e)
      }
    }
    if (failedExecutor.isDefined) {
      assert(reason.isDefined)
      dagScheduler.executorLost(failedExecutor.get, reason.get)
      backend.reviveOffers()
    }
  }

在statusUpdate的方法中,最重要的步骤是:第三步,调用taskResultGetter的enqueueSuccessfulTask方法,对执行成功的Task结果进行处理

下面对该方法进行阅读:

enqueueSuccessfulTask方法步骤如下:

1.对task结果进行反序列化,对task结果类型进行模式匹配

2.如果task结果类型为directResult,则结果数据存放在DirectTaskResult中,对DirectTaskResult中的数据进行反序列化

    如果task的结果类型为IndirectTaskResult,则结果数据存放在blockManager中,调用blockManager的getRemoteBytes方法拉取数据,将数据反序列化

3.更新task的执行结果的累加器中的结果大小

4.调用TaskSchedulerImpl的handleSuccessfulTask方法,用于处理成功的task

  def enqueueSuccessfulTask(
      taskSetManager: TaskSetManager,
      tid: Long,
      serializedData: ByteBuffer): Unit = {
    getTaskResultExecutor.execute(new Runnable {
      override def run(): Unit = Utils.logUncaughtExceptions {
        try {
          val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
              // 如果结果类型为directResult,说明结果存在DirectTaskResult中
            case directResult: DirectTaskResult[_] =>
              if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
                return
              }
             
              //  获取直接结果
              directResult.value(taskResultSerializer.get())
              (directResult, serializedData.limit())

              //  如果结果类型为IndirectTaskResult,说明结果存在blockManager中
            case IndirectTaskResult(blockId, size) =>
              if (!taskSetManager.canFetchMoreResults(size)) {
                // dropped by executor if size is larger than maxResultSize
                sparkEnv.blockManager.master.removeBlock(blockId)
                return
              }

              logDebug("Fetching indirect task result for TID %s".format(tid))
              scheduler.handleTaskGettingResult(taskSetManager, tid)
             // 调用blockManager的getRemoteBytes方法拉取数据,将数据反序列化
              val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
              if (!serializedTaskResult.isDefined) {
                scheduler.handleFailedTask(
                  taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
                return
              }
              val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
                serializedTaskResult.get.toByteBuffer)
              // force deserialization of referenced value
              deserializedResult.value(taskResultSerializer.get())
              sparkEnv.blockManager.master.removeBlock(blockId)
              (deserializedResult, size)
          }

          //  更新task的执行结果的累加器中的结果大小
          result.accumUpdates = result.accumUpdates.map { a =>
            if (a.name == Some(InternalAccumulator.RESULT_SIZE)) {
              val acc = a.asInstanceOf[LongAccumulator]
              assert(acc.sum == 0L, "task result size should not have been set on the executors")
              acc.setValue(size.toLong)
              acc
            } else {
              a
            }
          }
          //  调用TaskSchedulerImpl的handleSuccessfulTask方法,处理成功的task
          scheduler.handleSuccessfulTask(taskSetManager, tid, result)
        } catch {
          case cnf: ClassNotFoundException =>
            val loader = Thread.currentThread.getContextClassLoader
            taskSetManager.abort("ClassNotFound with classloader: " + loader)
          case NonFatal(ex) =>
            logError("Exception while getting task result", ex)
            taskSetManager.abort("Exception while getting task result: %s".format(ex))
        }
      }
    })
  }

下面去看TaskSchedulerImpl的handleSuccessfulTask方法:

实则是调用了taskSetManager的handleSuccessfulTask方法

  def handleSuccessfulTask(
      taskSetManager: TaskSetManager,
      tid: Long,
      taskResult: DirectTaskResult[_]): Unit = synchronized {
    taskSetManager.handleSuccessfulTask(tid, taskResult)
  }

关于taskSetManager的handleSuccessfulTask方法,步骤如下:

1.将taskInfos中缓存的task info标记为FINISHED

2.将task id从RunningTask集合中移除

3.调用dagScheduler的taskEnded方法,报告任务完成

4.同一个Task可能会有多种尝试运行,如果尝试成功后,其他方式的尝试都是没必要的,所以杀死其他方式尝试

5.如果successful集合中没有被存在该task成功的状态,则为运行成功的task数 +1,打印日志:task完成,并将successful集合中,该task状态更新为true,即成功

日志实例如下:

8/07/23 17:14:14 INFO TaskSetManager: Finished task 98.0 in stage 0.0 (TID 98) in 13 ms on 192.168.1.40 (executor 0) (99/100)

6.最后调用maybeFinishTaskSet,对可能完成的TaskSet进行清理

最后将整个Task的运行结果处理总结如下图:


至此,从Spark submit提交程序,到对Task运行结果处理,我们整个Spark 应用程序的运行过程源码我们就阅读完毕了

完结,撒花!!


Spark源码系列未完待续,敬请期待。

看完了点个赞呗!



赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

You may also like...

发表评论

邮箱地址不会被公开。