Spark源码阅读:Task的启动


系列导读

想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~

1.Spark Submit任务提交

2.Driver的注册与启动

3.SparkContext的初始化

4.TaskScheduler的创建和启动

5.Application的注册

6.Master注册机制-Application的注册与Application的调度算法

7.Spark源码阅读:Executor的启动

8.CoarseGrainedExecutorBackend的启动

9.DAGScheduler源码分析

10.DAGScheduler Stage划分与Task最佳位置计算

11.TaskScheduler调度算法与调度池

12.TaskScheduler任务提交与资源分配

13.Task的启动

14.Task的结果处理


上一篇文章介绍了taskSchedulersubmitTasks方法用于提交task任务,方法中调用backend的reviveOffers方法为Task分配资源,分配好资源后,启动Task

(了解相关内容请戳:Spark源码阅读:TaskScheduler任务提交与资源分配

VV将整个Task的提交流程总结如下图

今天就来阅读最后一步Task的启动的相关源码

在CoarseGrainedSchedulerBackend的makeOffers方法中有4个主要步骤

1.筛选掉死去的executors

2.将这个application所有可用的executor,将其封装为workOffers,每个workOffers代表了每个executor可用的cpu资源数量

3.调用scheduler的resourceOffers方法,将workOffers传入,实际调用的是TaskSchedulerImpl的resourceOffers方法

4.获取scheduler.resourceOffers的返回的已分配好资源的Task序列,调用launchTasks方法,启动task,下面去看launchTasks方法

    private def makeOffers() {
      //  筛选掉死掉的executor
      val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
     //  将这个application所有可用的executor,将其封装成了workOffers,每个workOffers代表了每个executor可用的cpu资源数量
      val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toIndexedSeq
      // 调用scheduler.resourceOffers的方法,将workOffers传入
      //  获取scheduler.resourceOffers的返回值,调用launchTasks方法
      launchTasks(scheduler.resourceOffers(workOffers))
    }

launchTasks方法中:

1.将task序列调用flatten方法展开后遍历

2.每个executor将即将执行的task信息序列化

如果已经序列化的serializedTask buffer大于Rpc发送消息的最大值,将taskSetManager abort停止,建议调整maxRpcMessageSize

如果小于Rpc发送消息的最大值,则找到Task对应的executor,将当前executor空闲cpu资源数减去Task将要使用的cpu数

3.向executor发送LaunchTask消息,在executor上启动task

 private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        // 将每个executor要执行的task信息序列化
        val serializedTask = ser.serialize(task)
        if (serializedTask.limit >= maxRpcMessageSize) {
          scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
                "spark.rpc.message.maxSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
              taskSetMgr.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {
          //  找到对应的executor
          val executorData = executorDataMap(task.executorId)
          //  将当前executor空闲cpu资源数减去将要使用的cpu数
          executorData.freeCores -= scheduler.CPUS_PER_TASK

          logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
            s"${executorData.executorHost}.")
          //    向executor发送LaunchTask消息,在executor上启动task
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
      }
    }

向executor发送消息,其实是向CoarseGrainedExecutorBackend发送消息

了解CoarseGrainedExecutorBackend相关内容请戳:Spark源码阅读:CoarseGrainedExecutorBackend的启动

CoarseGrainedExecutorBackend接收到LaunchTask消息后,做出如下处理:

1.首先判断消息是否为空,如果是空,则退出executor

2.如果不为空,将task seq反序列化,调用executor的launchTask方法,将反序列化得到的task信息传入

      // 启动Task
    case LaunchTask(data) =>
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
        //  反序列化task
        val taskDesc = ser.deserialize[TaskDescription](data.value)
        logInfo("Got assigned task " + taskDesc.taskId)

        //  执行executor的launchTask方法
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
      }

来到org.apache.spark.executor.class

看看launchTask方法,在该方法中:

1.给每个Task都会创建一个TaskRunner,TaskRunner继承自Runneralbe,TaskRunner的run方法中,会运行task

2.将task添加到runningTasks集合中,标记为task正在运行

3.将TaskRunner被放到一个线程池中执行

 def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer): Unit = {
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }

重点看TaskRunner的run方法run方法中会一直对Task的状态进行监控并向driver更新task的运行状态

1.首先进行一些准备工作:

    (1)记录时间:反序列化系统时间,反序列化CPU时间,GC时间

    (2)加载一些工具:JVM线程监控器,线程上下文加载器、实例化closureSerializer序列化器用于序列化task

    (3)打印日志:正在运行task,如下所示 

18/07/23 16:47:05 INFO Executor: Running task 89.0 in stage 0.0 (TID 89)

    (4)打印日志后,调用CoarseGrainedExecutorBackend的statusUpdate方法,告诉drivertask状态更新:task状态为正在运行

2.反序列化task:

    (1)反序列化task相关依赖

    (2)检查task相关依赖,如果有遗漏或新增依赖,则更新依赖

    (3)反序列化task

    (4)为task设置TaskMemoryManager

    (5)检查task是否被kill 了,如果这个task在反序列化之前被kill了,则退出,如果在反序列化之后被kill,则继续执行task

    (6)调用mapOutputTracker的updateEpoch方法,更新epoch

3.task的运行:

    (1)记录task真正启动的系统、cpu时间

    (2)调用task.run运行task,获取task执行的返回值value

如果获取到返回值,即Task执行完成,下面要对task的结果进行处理

    (3)task运行完毕,释放blockManager相关task的锁,释放task占用的内存

    (4)记录task执行完成后的系统、cpu时间

    (5)将task运行相关时间指标添加到metrics度量系统

4.task的结果处理:

    (1)获取task计算的累加结果

    (2)将task序列化结果和累加结果封装为DirectTaskResult对象(task直接结果集)

    (3)将DirectTaskResult序列化

    (4)获取DirectTaskResult集合大小,即task结果大小

    (5)根据条件判断,获得最终的序列化结果

        spark中,task默认的最大结果大小限制maxResultSize)为1G,该大小由spark配置spark.driver.maxResultSize设置

        如果task的结果大小大于1G,且task默认的最大限制大于0,则打印警报日志,result结果过大,删除该结果

        spark中,task默认的最大直接结果大小限制maxDirectResultSize)默认为1048576 bytes,该值由spark配置spark.task.maxDirectResultSize与spark.rpc.message.maxSize的最小值决定

        如果taskd的结果大于最大直接结果大小限制,将task结果写到blockManager中,存储级别为MEMORY_AND_DISK_SER,并打印日志:task计算完成,task结果通过blockManager发送给driver

        日志示例如下:

18/07/24 09:27:07 INFO executor.Executor: Finished task 0.0 in stage 3.0 (TID 4). 3097876 bytes result sent via BlockManager

        如果resultSize小于最大直接结果大小限制,则打印日志:task计算完成,task结果直接发送给driver

        日志示例如下:

18/07/23 16:47:05 INFO Executor: Finished task 89.0 in stage 0.0 (TID 89). 1041 bytes result sent to driver

5.调用CoarseGrainedExecutorBackend的statusUpdate更新状态方法,告诉drivertask状态更新:task状态为已完成

6.最后,移除runningTasks集合中已执行完的task

看过这部分源码之后,大家应该了解了在Spark运行过程中这些日志的出处了。

 override def run(): Unit = {
      threadId = Thread.currentThread.getId
      //  设置线程名:Executor task launch worker for task XXX
      Thread.currentThread.setName(threadName)
      //  获取JVM中线程托管Bean
      val threadMXBean = ManagementFactory.getThreadMXBean
      //  创建TaskMemoryManager
      val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
      //  获取反序列化开始时间
      val deserializeStartTime = System.currentTimeMillis()
      //  获取反序列化CPU时间
      val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
        threadMXBean.getCurrentThreadCpuTime
      } else 0L
      //  获取线程上下文加载器
      Thread.currentThread.setContextClassLoader(replClassLoader)
      //  实例task序列化器
      val ser = env.closureSerializer.newInstance()
      //  打印日志:正在运行task
      logInfo(s"Running $taskName (TID $taskId)")
      //  调用CoarseGrainedExecutorBackend的statusUpdate方法,更新状态
      execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
      var taskStart: Long = 0
      var taskStartCpu: Long = 0
      //  获取gc时间
      startGCTime = computeTotalGcTime()

      try {
        //  反序列化task依赖
        val (taskFiles, taskJars, taskProps, taskBytes) =
          Task.deserializeWithDependencies(serializedTask)

        //  为executor将task完全反序列化
        Executor.taskDeserializationProps.set(taskProps)

        //  如果遗漏依赖包,或者更新依赖包,通过updateDependencies方法更新依赖文件和jar包
        updateDependencies(taskFiles, taskJars)
        //  反序列化获取task
        task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
        //  为task设置本地参数
        task.localProperties = taskProps
        //  为task设置TaskMemoryManager
        task.setTaskMemoryManager(taskMemoryManager)

        //  如果这个task在反序列化之前被kill了,则退出
        //  否则,继续执行task
        if (killed) {
          throw new TaskKilledException
        }
        //调用mapOutputTracker的updateEpoch方法,更新epoch
        logDebug("Task " + taskId + "'s epoch is " + task.epoch)
        env.mapOutputTracker.updateEpoch(task.epoch) 

        //  记录task启动系统时间
        taskStart = System.currentTimeMillis()
        //  记录task启动cpu时间
        taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
          threadMXBean.getCurrentThreadCpuTime
        } else 0L
        var threwException = true
        //  获取task计算的返回值
        val value = try {
          //  调用task的run方法,启动task
          val res = task.run(
            taskAttemptId = taskId,
            attemptNumber = attemptNumber,
            metricsSystem = env.metricsSystem)
          threwException = false
          res
        } finally {
          //  blockManager释放相关task的锁
          val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
          //  释放task占用的内存
          val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()

        //  记录task结束系统时间
        val taskFinish = System.currentTimeMillis()
        //  记录task结束cpu时间
        val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
          threadMXBean.getCurrentThreadCpuTime
        } else 0L

        // If the task has been killed, let's fail it.
        if (task.killed) {
          throw new TaskKilledException
        }

        //  实例序列化对象
        val resultSer = env.serializer.newInstance()
        //  序列化前系统时间
        val beforeSerialization = System.currentTimeMillis()
        //  将task计算结果序列化
        val valueBytes = resultSer.serialize(value)
        //  序列化后系统时间
        val afterSerialization = System.currentTimeMillis()

        //  将task运行时间指标数据添加到metrics度量系统
        task.metrics.setExecutorDeserializeTime(
          (taskStart - deserializeStartTime) + task.executorDeserializeTime)
        task.metrics.setExecutorDeserializeCpuTime(
          (taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime)
        // We need to subtract Task.run()'s deserialization time to avoid double-counting
        task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
        task.metrics.setExecutorCpuTime(
          (taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime)
        task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
        task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization)

        //  获取task的累加结果
        val accumUpdates = task.collectAccumulatorUpdates()
        //  将task序列化结果和累加结果封装为DirectTaskResult(task直接结果集)
        val directResult = new DirectTaskResult(valueBytes, accumUpdates)
        //  将DirectTaskResult序列化
        val serializedDirectResult = ser.serialize(directResult)
        //  获取task结果大小
        val resultSize = serializedDirectResult.limit

        //  将serializedDirectResult(已经序列化的Task直接结果集)直接发送给driver
        val serializedResult: ByteBuffer = {
          //  maxResultSize大小默认为1G
          //  如果最大结果大于0并且task结果大于最大结果,则打印警报日志,result结果过大
          if (maxResultSize > 0 && resultSize > maxResultSize) {
            logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
              s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
              s"dropping it.")
            ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
            //  如果task结果大于maxDirectResultSize
          } else if (resultSize > maxDirectResultSize) {
            //  获取blockId
            val blockId = TaskResultBlockId(taskId)
            //  将task直接结果写到blockManager,存储级别为MEMORY_AND_DISK_SER
            env.blockManager.putBytes(
              blockId,
              new ChunkedByteBuffer(serializedDirectResult.duplicate()),
              StorageLevel.MEMORY_AND_DISK_SER)
            logInfo(
              s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
            //  序列化在BlockManager中directTaskResult的引用
            ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
          } else {
            //  如果resultSize都不满足上述条件,返回serializedDirectResult
            logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
            serializedDirectResult
          }
        }
        //  调用CoarseGrainedExecutorBackend的statusUpdate更新状态方法,将task状态更新为已完成
        execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
      } 
省略异常处理代码......
      } finally {
        //  移除runningTasks集合中已执行完的task
        runningTasks.remove(taskId)
      }
    }
  }

最后,我们来看一下,调用CoarseGrainedExecutorBackend的statusUpdate更新状态方法,对task执行结果的处理

statusUpdate方法中,向driver发送消息StatusUpdate,task状态更新

消息内容:executor id,task id,task状态,task结果数据

  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
    val msg = StatusUpdate(executorId, taskId, state, data)
    driver match {
      case Some(driverRef) => driverRef.send(msg)
      case None => logWarning(s"Drop $msg because has not yet connected to driver")
    }
  }

driver端,即CoarseGrainedSchedulerBackend,接收到消息后

做出如下响应:

1.调用scheduler的statusUpdate方法,这里的scheduler为TaskSchedulerImpl,即调用TaskSchedulerImpl的statusUpdate方法

2.如果task的状态为isFinished,将executor 空闲cpu信息更新,调用makeOffers将资源释放

  如果不是,则打印警告日志,无视该状态更新

 override def receive: PartialFunction[Any, Unit] = {
      case StatusUpdate(executorId, taskId, state, data) =>
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {
          executorDataMap.get(executorId) match {
            case Some(executorInfo) => executorInfo.freeCores += scheduler.CPUS_PER_TASK
              makeOffers(executorId)
            case None =>
              logWarning(s"Ignored task status update ($taskId state $state) " +
                s"from unknown executor with ID $executorId")
          }
        }

至此,整个Task的启动,阅读完毕。

关于TaskSchedulerImpl端对Task执行结果的处理,我们下篇文章再写

亲,看完了点个赞呀!!!

赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

You may also like...

发表评论

邮箱地址不会被公开。