Spark源码阅读: Spark Submit任务提交

翻了翻之前写的文章,发现思路有一些问题,文章前后有的串不起来,近期会对之前的文章进行顺序调整和改写。 今天,重新来开个头。
作为一名Spark使用者,我对Spark任务从开始到结束都抱着一个好奇心。
想研究一下Spark内部是怎么运行的,究竟其中有什么奥秘,借鉴了前辈们的思路,开始的自己的源码阅读。

Spark源码版本: 2.1.1

Spark源码包: Spark Core

Cluster Mode: Standalone

Spark集群版本:HDP 2.6.0.3  Spark 2.1.0


概述

了解Spark任务执行的细节,最先看的就是官方文档,官方文档中简要概述了Spark在集群上的运行方式,以便容易理解所涉及的组件

Spark应用程序在集群上以独立的进程集运行,整个的任务执行过程如下:

Spark cluster components

1.用户提交编写的程序(作为Driver程序)初始化SparkContext对象,SparkContext负责协调应用程序在集群上运行

2.想要在集群上运行, SparkContext需要连接到集群管理器Cluster Manager,申请资源,注册Application

集群管理器有多种:Spark独立集群管理器,即Standalone,除此之外还有Mesos、YARN

集群管理器负责在应用程序之间分配资源

3.连接到集群管理器后,根据申请的资源,在集群中的Worker节点上创建Executor

4.创建好Executor后,Executor将信息发送给Driver

5.SparkContext初始化过程中创建并启动DAGScheduler将用户编写的程序转化为Task任务,将Task任务发送给指定Executor,进行任务计算 

6.将Task计算结果返回Driver,Spark任务计算完毕,一系列处理关闭Spark任务。

那么,接下来的文章,会对整个任务执行流程进行源码阅读

整个系列导读如下,可以按照序号顺序进行阅读:

1.Spark Submit任务提交

2.Driver的注册与启动

3.SparkContext的初始化

4.TaskScheduler的创建和启动

5.Application的注册

6.Master注册机制-Application的注册与Application的调度算法

7.Spark源码阅读:Executor的启动

8.CoarseGrainedExecutorBackend的启动

9.DAGScheduler源码分析

10.DAGScheduler Stage划分与Task最佳位置计算

11.TaskScheduler调度算法与调度池

12.TaskScheduler任务提交与资源分配

13.Task的启动

14.Task的结果处理


提交程序

从Spark任务的第一步开始,如何提交用户编写的程序?

同样,在官方文档中也有介绍如何提交应用程序

使用$SPARK_HOME/bin目录下的 spark-submit 脚本去提交用户的程序

./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

如上,列出了一些常用选项:

–class:应用程序的入口(例如 org.apache.spark.examples.SparkPi

–master:master地址,这是集群中master的URL地址(例如 spark://192.168.1.10:7077 )

–deploy-mode:部署模式,是否将用户的Driver程序部署到集群的Worker节点(cluster集群模式),或将本地作为外部client客户端模式(默认为client客户端模式)

–conf:spark 配置,键-值对形式

application-jar:用户程序Jar包路径

application-arguments:用户应用程序所需参数

一个spark-submit实例如下:

./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://192.168.1.20:7077 \
  --deploy-mode cluster \
  --supervise \
  --executor-memory 2G \
  --total-executor-cores 5 \
  /path/to/examples.jar \
  1000

–class 指定程序入口为org.apache.spark.examples.SparkPi

–master 指定master url地址为spark://192.168.1.20:7077

–deploy-mode 指定部署模式为cluster

–supervise 在程序执行失败后,重新启动application

–executor-memory 2G    每个executor的内存为2G

–total-executor-cores  5  executor的cpu core总数为5

/path/to/example.jar    程序的jar包

1000    程序所需参数

提交这个spark任务,命令行日志如下:

[root@louisvv bin]# ./spark-submit --class org.apache.spark.examples.SparkPi --master spark://192.168.1.20:7077 --deploy-mode cluster  --executor-memory 2G --total-executor-cores 5 ../examples/jars/spark-examples_2.11-2.1.0.2.6.0.3-8.jar 1000
Running Spark using the REST application submission protocol.
18/04/19 17:03:29 INFO RestSubmissionClient: Submitting a request to launch an application in spark://192.168.1.20:7077.
18/04/19 17:03:40 WARN RestSubmissionClient: Unable to connect to server spark://192.168.1.20:7077.
Warning: Master endpoint spark://192.168.1.20:7077 was not a REST server. Falling back to legacy submission gateway instead.

application运行完成


但报了一个WARN,说spark://192.168.1.20:7077不是一个REST服务,使用传统的提交网关

这个WARN,会在下面会进行解释


脚本分析

从Spark-submit这个脚本作为入口,脚本最后调用exec执行 “${SPARK_HOME}”/bin/spark-class  调用class为:org.apache.spark.deploy.SparkSubmit  “$@”为脚本执行的所有参数

即–class org.apache.spark.examples.SparkPi \
  –master spark://192.168.1.20:7077 \
  –deploy-mode cluster \
  –supervise \
  –executor-memory 2G \
  –total-executor-cores 5 \
  /path/to/examples.jar \
  1000

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi  # disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

看一下spark-class脚本执行步骤:

1.首先校验$SPARK_HOME/conf,spark相关依赖目录$SPARK_HOME/jars,hadoop相关依赖目录$HADOOP_HOEM/lib

2.将校验所得所有目录地址拼接为LAUNCH_CLASSPATH变量

3.将$JAVA_HOME/bin/java 定义为RUNNER变量

4.调用build_command()方法,创建执行命令

5.把build_command()方法创建的命令,循环加到数组CMD中,最后执行exec执行CMD命令

-z 判断SPARK_HOME变量的长度是否为0,等于0为真
if [ -z "${SPARK_HOME}" ]; then
加载当前目录的find...的变量
  source "$(dirname "$0")"/find-spark-home
fi
加载这个文件的变量
. "${SPARK_HOME}"/bin/load-spark-env.sh

# Find the java binary
-n 判断变量长度是否不为0,不为0为真
if [ -n "${JAVA_HOME}" ]; then
JAVAHOME存在就赋值RUNNER为这个
  RUNNER="${JAVA_HOME}/bin/java"
else
监测java命令是否存在
  if [ "$(command -v java)" ]; then
    RUNNER="java"
  else
不存在退出
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi

# Find Spark jars.
判断${SPARK_HOME}/jars目录是否存在,存在为真
if [ -d "${SPARK_HOME}/jars" ]; then
  SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
  SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi

判断下边俩,不都存在就报错退出
if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
  echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
  echo "You need to build Spark with the target \"package\" before running this program." 1>&2
  exit 1
else
存在就变量赋值
  LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi

HADOOP_LZO_JAR=
HADOOP_LZO_DIR="/usr/hdp/${HDP_VERSION}/hadoop/lib"
获取那个目录下匹配hadoop-lzo.*${HDP_VERSION}\.jar的jar包数量
num_jars="$(ls -1 "$HADOOP_LZO_DIR" | grep "^hadoop-lzo.*${HDP_VERSION}\.jar$" | wc -l)"
如果数量为0,把那个变量弄为空
if [ "$num_jars" -eq "0" -a -z "$HADOOP_LZO_JAR" ]; then
  HADOOP_LZO_JAR=
大于1报错退出
elif [ "$num_jars" -gt "1" ]; then
  echo "Found multiple Hadoop lzo jars in $HADOOP_LZO_DIR:" 1>&2
  echo "Please remove all but one jar." 1>&2
  exit 1
等于1赋值
elif [ "$num_jars" -eq "1" ]; then
  LZO_JARS="$(ls -1 "$HADOOP_LZO_DIR" | grep "^hadoop-lzo-.*${HDP_VERSION}\.jar$" || true)"
  HADOOP_LZO_JAR="${HADOOP_LZO_DIR}/${LZO_JARS}"
fi

export _HADOOP_LZO_JAR="${HADOOP_LZO_JAR}"

# Add the launcher build dir to the classpath if requested.
这变量长度大于1赋值
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
  LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi

# For tests
这变量长度大于1 unset目录权限
if [[ -n "$SPARK_TESTING" ]]; then
  unset YARN_CONF_DIR
  unset HADOOP_CONF_DIR
fi

build_command() {
执行命令获取
  "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
输出返回值
  printf "%d\0" $?
}

创建数组
CMD=()
把build_commands输出结果,循环加到数组CMD中
while IFS= read -d '' -r ARG; do
  CMD+=("$ARG")
done < <(build_command "$@")

数组长度
COUNT=${#CMD[@]}
数组长度-1
LAST=$((COUNT - 1))
数组的最后一个值,也就是上边$?的值
LAUNCHER_EXIT_CODE=${CMD[$LAST]}

如果返回值不是数字,退出
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
  echo "${CMD[@]}" | head -n-1 1>&2
  exit 1
fi

如果返回值不为0,退出,返回返回值
if [ $LAUNCHER_EXIT_CODE != 0 ]; then
  exit $LAUNCHER_EXIT_CODE
fi

CMD还是原来那些参数,$@
CMD=("${CMD[@]:0:$LAST}")
执行这些
exec "${CMD[@]}"

最终,执行的CMD命令如下:

/opt/jdk1.8/bin/java -Dhdp.version=2.6.0.3-8 -cp /usr/hdp/current/spark2-historyserver/conf/:/usr/hdp/2.6.0.3-8/spark2/jars/*:/usr/hdp/current/hadoop-client/conf/ 
org.apache.spark.deploy.SparkSubmit \
--master spark://192.168.1.20:7077 \
--deploy-mode cluster \
--class org.apache.spark.examples.SparkPi \
--executor-memory 2G \
--total-executor-cores 5 \
../examples/jars/spark-examples_2.11-2.1.0.2.6.0.3-8.jar \
1000

源码解析

最终执行的命令中,指定了程序的入口为org.apache.spark.deploy.SparkSubmit,来看一下它的主函数

根据解析后参数action进行模式匹配,如果是submit操作,则调用submit方法

  def main(args: Array[String]): Unit = {
    //  SparkSubmitArguments继承了SparkSubmitArgumentsParser,对提交参数进行解析
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      // scalastyle:off println
      printStream.println(appArgs)
      // scalastyle:on println
    }
    //  对appArgs的action进行模式匹配
    appArgs.action match {
        //  如果是SUBMIT,则调用submit
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      //  如果是KILL,则调用kill
      case SparkSubmitAction.KILL => kill(appArgs)
      //  如果是REQUEST_STATUS,则调用requestStatus
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }

submit方法中,首先调用prepareSubmitEnvironment方法,准备submit环境

 private def submit(args: SparkSubmitArguments): Unit = {

    //  首先调用prepareSubmitEnvironment
    val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)

prepareSubmitEnvironment方法中做了如下操作:

1.根据参数中master和delpoy-mode,设置对应的clusterManager和部署模式

2.再根据args中的其他参数,设置相关childArgs, childClasspath, sysProps, childMainClass,并返回结果

 //  根据参数的master,设置对应的集群资源管理器
    val clusterManager: Int = args.master match {
      case "yarn" => YARN
      case "yarn-client" | "yarn-cluster" =>
        printWarning(s"Master ${args.master} is deprecated since 2.0." +
          " Please use master \"yarn\" with specified deploy mode instead.")
        YARN
      case m if m.startsWith("spark") => STANDALONE
      case m if m.startsWith("mesos") => MESOS
      case m if m.startsWith("local") => LOCAL
      case _ =>
        printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
        -1
    }

    //  根据参数的deployMode,设置部署模式
    var deployMode: Int = args.deployMode match {
      case "client" | null => CLIENT
      case "cluster" => CLUSTER
      case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1
    }

    //  在standalone cluster模式,使用Rest client提交application
    //  Rest client提交,根据useRest进行判断,useRest为True为RestSubmissionClient方式提交application,否则为Client方式提交
    if (args.isStandaloneCluster) {
      if (args.useRest) {
        childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
        childArgs += (args.primaryResource, args.mainClass)
      } else {
        // In legacy standalone cluster mode, use Client as a wrapper around the user class
        // 在传统的standalone集群模式中,使用Client作为用户类的包装器
        childMainClass = "org.apache.spark.deploy.Client"
        //  如果参数中有设置supervise,则childArgs中添加supervise相关参数
        if (args.supervise) { childArgs += "--supervise" }
        //  获取参数中对driverMemory,driverCores的配置参数,将其添加到childArgs中
        Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
        Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
        childArgs += "launch"
        childArgs += (args.master, args.primaryResource, args.mainClass)
      }
      if (args.childArgs != null) {
        childArgs ++= args.childArgs
      }
    }

prepareSubmitEnvironment完成后,需要判断是否为Standalone Cluster模式和是否设置了useRest

在standalone集群模式下,有两个提交网关:
1.使用org.apache.spark.deploy.Client作为包装器来使用传统的RPC网关
2.Spark 1.3中引入的基于rest的网关
第二种方法是Spark 1.3的默认行为,但是Spark submit将会失败
如果master不是一个REST服务器,那么它将无法使用REST网关

在上面执行spark-submit 提交SparkPi任务时,报出的WARN,就是在提交网关这里啦!

 /*  在standalone集群模式下,有两个提交网关:
    *   1.使用org.apache.spark.deploy.Client作为包装器来使用传统的RPC网关
    *   2.Spark 1.3中引入的基于rest的网关
    *   第二种方法是Spark 1.3的默认行为,但是Spark submit将会失败
    *   如果master不是一个REST服务器,那么它将无法使用REST网关。
    */
    if (args.isStandaloneCluster && args.useRest) {
      try {
        printStream.println("Running Spark using the REST application submission protocol.")
        // 调用doRunMain()方法
        doRunMain()
      } catch {
        // Fail over to use the legacy submission gateway
        case e: SubmitRestConnectionException =>
          printWarning(s"Master endpoint ${args.master} was not a REST server. " +
            "Falling back to legacy submission gateway instead.")
          args.useRest = false
          submit(args)
      }
      // 其他模式,直接调用doRunMain方法
    } else {
      doRunMain()
    }

接着看一下doRunMain方法,其实调用了runMain方法

    def doRunMain(): Unit = {
            override def run(): Unit = {
              runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
            }

runMain通过反射mainMethod.invoke执行该方法

当deploy mode为client时,执行用户自己编写的主方法

当deploy mode为cluster时,需要判断是否为REST提交,如果是则执行org.apache.spark.rest.RestSubmissionClient的主方法,如果不是则执行org.apache.spark.deploy.Client的主方法

 private def runMain(
      childArgs: Seq[String],
      childClasspath: Seq[String],
      sysProps: Map[String, String],
      childMainClass: String,
      verbose: Boolean): Unit = {
    
    val loader =
      if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
        new ChildFirstURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      } else {
        new MutableURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      }
    Thread.currentThread.setContextClassLoader(loader)

    //  使用URLClassLoader加载jar包
    for (jar <- childClasspath) {
      addJarToClasspath(jar, loader)
    }  mainClass = Utils.classForName(childMainClass)

    //  获取mainClass的main方法
    val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
    if (!Modifier.isStatic(mainMethod.getModifiers)) {
      throw new IllegalStateException("The main method in the given main class must be static")
    }

      //  执行该方法,传入参数数组
    mainMethod.invoke(null, childArgs.toArray)  }

VV没用过REST方式提交,就来看传统的org.apache.spark.deploy.Client提交方式

Client用于启动和终止独立集群中的Driver程序

 def main(args: Array[String]) {
    // scalastyle:off println
    if (!sys.props.contains("SPARK_SUBMIT")) {
      println("WARNING: This client is deprecated and will be removed in a future version of Spark")
      println("Use ./bin/spark-submit with \"--master spark://host:port\"")
    }
    // scalastyle:on println


    val conf = new SparkConf()
    //  处理传入的参数
    val driverArgs = new ClientArguments(args)

    if (!conf.contains("spark.rpc.askTimeout")) {
      conf.set("spark.rpc.askTimeout", "10s")
    }
    Logger.getRootLogger.setLevel(driverArgs.logLevel)

    //  创建rpcEnv
    val rpcEnv =
      RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

    //  获取RpcEndpointRef,用于和Master进行通讯
    val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
      map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))

    //  注册RpcEndpoint,调用onStart方法
    rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))

    rpcEnv.awaitTermination()
  }

这个ClientEndpoint用于将消息转发给Driver程序的代理

启动时调用其onStart方法,该方法执行内容如下:

1.根据cmd进行模式匹配,如果命令为launch,获取driver额外的java依赖、classpath、java配置

2.在提交spark任务时,可添加一些额外的参数对driver进行额外的配置:

spark.driver.extraClassPath     driver的额外classpath

spark.driver.extraLibraryPath    driver的额外lib路径

spark.driver.extraJavaOptions    driver的额外java配置,jvm相关配置

在onStart方法中,第二个步骤就是,获取的driver额外参数的配置

3.将获取到的额外配置和driver参数封装为command对象,即在命令行启动时执行的shell命令

4.将command和driver启动信息封装为driverDescription对象,该对象为driver的基本信息描述,调用RequestSubmitDriver方法,将driver相关信息发送给Master,向Master申请注册Drvier 

override def onStart(): Unit = {
    //  根据driverArgs.cmd进行模式匹配
    driverArgs.cmd match {
      case "launch" =>    //如果是启动
        val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"

        val classPathConf = "spark.driver.extraClassPath"
        val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
          cp.split(java.io.File.pathSeparator)
        }

        val libraryPathConf = "spark.driver.extraLibraryPath"
        val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
          cp.split(java.io.File.pathSeparator)
        }

        val extraJavaOptsConf = "spark.driver.extraJavaOptions"
        val extraJavaOpts = sys.props.get(extraJavaOptsConf)
          .map(Utils.splitCommandString).getOrElse(Seq.empty)
        val sparkJavaOpts = Utils.sparkJavaOpts(conf)
        val javaOpts = sparkJavaOpts ++ extraJavaOpts

        //  将classPathEntries,libraryPathEntries,javaOpts,drvierArgs信息封装成Command
        //  这里的mainClass为org.apache.spark.deploy.worker.DriverWrapper
        val command = new Command(mainClass,
          Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
          sys.env, classPathEntries, libraryPathEntries, javaOpts)

        //  将drvierArgs,command信息封装成DriverDescription
        val driverDescription = new DriverDescription(
          driverArgs.jarUrl,
          driverArgs.memory,
          driverArgs.cores,
          driverArgs.supervise,
          command)
        ayncSendToMasterAndForwardReply[SubmitDriverResponse](
          //  向master发送RequestSubmitDriver,注册Driver
          RequestSubmitDriver(driverDescription))

      case "kill" =>   
        val driverId = driverArgs.driverId
        ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
    }
  }

在向master发送消息后,整个的Spark-Sumit任务提交就完成了,接下来就是等待master返回driver的注册结果,启动driver,driver的注册请访问:Spark源码阅读:Driver的注册与启动
最后,
将整个Spark Submit过程总结如下图:


如有错误,请指出哦~

我们下篇文章见~

亲,看完了点个赞呗!


赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

You may also like...

8 Responses

  1. word count说道:

    写的不错

  2. xmx说道:

    现在是2018.08.13你讲的方法好多都变了,也对不上了,无奈spark更新太快了···作者有没有时间写写spark2.3的··有许多变化的,我自己是看不懂··

  3. 寇毅说道:

    你好。 我想问一件事。 Hello。

  4. 陈少说道:

    qqqq红包拿来pppp

  5. Norine Beadle说道:

    aaa快乐是大家的aaa

  6. 匿名说道:

    所以spark://Desktop:7077 was not a REST server. Falling back to legacy submission gateway instead
    这个问题怎么解决呢?
    没有讲清楚啊,难道是提交任务的时候加入useRest这个参数吗?

发表评论

邮箱地址不会被公开。