Spark源码阅读:Worker的启动


今天我们来说说,Worker的启动

当我们想要在spark集群中启动所有worker节点时,我们可以执行spark脚本如下:

$SPARK_HOME/sbin/start-slaves.sh

脚本执行后,告诉我们如下信息:

XX节点,正在启动worker,日志地址为XXX

ai-node4: starting org.apache.spark.deploy.worker.Worker, logging to /var/log/spark2/spark-root-org.apache.spark.deploy.worker.Worker-1-ai-node4.out
ai-node3: starting org.apache.spark.deploy.worker.Worker, logging to /var/log/spark2/spark-root-org.apache.spark.deploy.worker.Worker-1-ai-node3.out
ai-main: starting org.apache.spark.deploy.worker.Worker, logging to /var/log/spark2/spark-root-org.apache.spark.deploy.worker.Worker-1-ai-main.out 

我们看一下在执行start-slaves.sh脚本中,究竟发生了什么

脚本中做了4件事

1.指定Spark环境变量:判断是否设置了SPARK_HOME的环境变量,如果没有设置,则添加SPARK_HOME到环境变量

2.指定Master端口:判断是否设置了master的端口,如果没有设置,则master端口为7077

3.指定Spark Master主机名:判断是否设置了spark master,如果没有,则获取本机主机名

4.启动子节点:进入SPARK_HOME目录,执行sbin目录下start-slave.sh脚本并指定spark master地址

步骤4示例如下:

/data/software/spark-2.1.0/sbin/slaves.sh cd /data/software/spark-2.1.0 ';' /data/software/spark-2.1.0/sbin/start-slave.sh spark://192.168.1.47:7077

start-slaves.sh脚本代码如下:

if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

. "${SPARK_HOME}/sbin/spark-config.sh"
. "${SPARK_HOME}/bin/load-spark-env.sh"

# Find the port number for the master
if [ "$SPARK_MASTER_PORT" = "" ]; then
  SPARK_MASTER_PORT=7077
fi

if [ "$SPARK_MASTER_HOST" = "" ]; then
  case `uname` in
      (SunOS)
	  SPARK_MASTER_HOST="`/usr/sbin/check-hostname | awk '{print $NF}'`"
	  ;;
      (*)
	  SPARK_MASTER_HOST="`hostname -f`"
	  ;;
  esac
fi

# Launch the slaves
"${SPARK_HOME}/sbin/slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-slave.sh" "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"

由上可知,在start-slaves.sh脚本中调用了start-slave.sh脚本,我们看一下start-slave.sh脚本

1.对SPARK_HOME环境变量进行校验,如果没有设置,则添加SPARK_HOME到环境变量

2.设置需要加载的worker类:org.apache.spark.deploy.worker.Worker

3.对start-slave.sh 脚本传入参数进行校验,如果不合法,则提示脚本用法

4.执行${SPARK_HOME}/sbin/spark-config.sh脚本,该脚本的作用:将SPARK配置文件目录添加到环境变量,并将PYSPARK的类添加到python中

5.执行${SPARK_HOME}/bin/load-spark-env.sh脚本,该脚本的作用:将spark-env.sh中所有配置添加到环境变量中,并设置spark所使用的scala版本

6.执行start_instance方法,启动worker

start_instance方法内容如下:

(1)对WORKER  WEB UI端口进行设置,如果没有指定WEB UI端口,则WEB UI默认端口为8081,但是如果本机有更多Worker实例,则WEB UI端口为8081加上当前的worker实例数-1

    举个例子:如果当前节点运行2个worker实例,则WEB UI端口为8081加上当前的worker实例数-1,即8081+2-1=8082

(2)调用${SPARK_HOME}/sbin”/spark-daemon.sh脚本,添加参数:要加载的worker类,start(动作),要启动的worker数量,指定计算后worker web ui端口

最后执行的命令如下:

/data/software/spark-2.1.0/sbin/spark-daemon.sh start org.apache.spark.deploy.worker.Worker 1 --webui-port 8081 spark://192.168.1.47:7077

start-slave.sh脚本代码如下:

if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

CLASS="org.apache.spark.deploy.worker.Worker"

if [[ $# -lt 1 ]] || [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
  echo "Usage: ./sbin/start-slave.sh [options] <master>"
  pattern="Usage:"
  pattern+="\|Using Spark's default log4j profile:"
  pattern+="\|Registered signal handlers for"

  "${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
  exit 1
fi

. "${SPARK_HOME}/sbin/spark-config.sh"

. "${SPARK_HOME}/bin/load-spark-env.sh"

MASTER=$1
shift

# Determine desired worker port
if [ "$SPARK_WORKER_WEBUI_PORT" = "" ]; then
  SPARK_WORKER_WEBUI_PORT=8081
fi

function start_instance {
  WORKER_NUM=$1
  shift

  if [ "$SPARK_WORKER_PORT" = "" ]; then
    PORT_FLAG=
    PORT_NUM=
  else
    PORT_FLAG="--port"
    PORT_NUM=$(( $SPARK_WORKER_PORT + $WORKER_NUM - 1 ))
  fi
  WEBUI_PORT=$(( $SPARK_WORKER_WEBUI_PORT + $WORKER_NUM - 1 ))

  "${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS $WORKER_NUM \
     --webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"
}

if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
  start_instance 1 "$@"
else
  for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
    start_instance $(( 1 + $i )) "$@"
  done
fi

接下来我们看spark-daemon.sh这个脚本,由于这个脚本内容比较多,我们只看我们关心的部分

该脚本中对传入提交动作进行匹配,我们刚才的动作为start,则调用run_command方法,该方法做了几个步骤:

1.创建spark 进程id文件夹,用于存放pid

2.判断pid文件是否存在,如果存在,则表示spark 进程正在执行,打印如下日志,脚本停止

org.apache.spark.deploy.worker.Worker running as process 160590.  Stop it first.

如果pid文件不存在,则打印如下日志:

starting org.apache.spark.deploy.worker.Worker, logging to /data/software/spark-2.1.0/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-ai-main.out

3.master端使用ssh方式登录到节点,这里才是真正的启动worker操作:调用$SPARK_HOME/bin/spark-class这个脚本,加载Worker类,并启动Worker

执行命令如下

nohup nice -n 0 /data/software/spark-2.1.0/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://192.168.1.47:7077

至此,脚本部分看完了


来看org.apache.spark.deploy.worker.Worker的主方法

主方法中做了几件事情:

1.创建sparkConf对象

2.获取shell中传入的参数

3.将参数传入startRpcEnvAndEndpoint方法,用于创建Worker rpcEnv和Endpoint,创建成功后启动

(这里提一嘴,Worker继承自ThreadSafeRpcEndpoint,Worker本身属于一个RpcEndpoint,可以与其他RpcEndpoint进行通信

  def main(argStrings: Array[String]) {
    Utils.initDaemon(log)
    val conf = new SparkConf
    val args = new WorkerArguments(argStrings, conf)
    // 将命令参数传入(port,port,webUiPort,cores,memory,masters,workDir,conf),创建rpcEnv,Endpoint,并启动
    val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
      args.memory, args.masters, args.workDir, conf = conf)
    rpcEnv.awaitTermination()
  }

去看startRpcEnvAndEndpoint做了什么

方法中创建并启动Worker rpcEnv和Endpoint,代码如下:

def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      cores: Int,
      memory: Int,
      masterUrls: Array[String],
      workDir: String,
      workerNumber: Option[Int] = None,
      conf: SparkConf = new SparkConf): RpcEnv = {
    val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
    val securityMgr = new SecurityManager(conf)
    //  创建rpcEnv
    val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
    val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
    //  通过rpcEnv创建Endpoint,创建Worker时调用onStart方法
    rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
      masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
    rpcEnv
  }

创建好Woker rpcEnv和Endpoint后,需要启动Worker Endpoint

当Endpoint启动时会调用Endpoint的构造onStart方法,该方法中:

1.创建worker工作目录

2.绑定worker web ui地址

3.调用registerWithMaster方法,向master进行注册

4.向metricsSystem注册

5.在worker上启动metricsSystem

override def onStart() {
    assert(!registered)
    logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
      host, port, cores, Utils.megabytesToString(memory)))
    logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
    logInfo("Spark home: " + sparkHome)
    // 创建worker工作目录
    createWorkDir()
    shuffleService.startIfEnabled()
    // 创建workerWebUI,并绑定
    webUi = new WorkerWebUI(this, workDir, webUiPort)
    webUi.bind()
    workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
    //  重点! 向master注册
    registerWithMaster()
    //  metricsSystem指标度量系统注册资源
    metricsSystem.registerSource(workerSource)
    //  开启metricsSystem指标度量系统
    metricsSystem.start()
    // Attach the worker metrics servlet handler to the web ui after the metrics system is started.
    metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
  }

步骤3所调用的registerWithMaster方法,实则调用tryRegisterAllMasters方法,向所有的master进行注册

 registerMasterFutures = tryRegisterAllMasters()

tryRegisterAllMasters中

创建了一个注册的线城池,因为向master注册是一个阻塞的操作,所以这个线程池必须要满足master rpc地址同时请求的最大数

接下来调用registerWithMaster方法:用于worker端与master进行通信,向master发送注册信息

registerWithMaster(masterEndpoint)

来看registerWithMaster方法:

1.向Master发送RegisterWorker消息:

消息体中包含如下信息:

workId:当前Worker的标识ID

host:节点

port:端口

self:当前的RpcEndpointRef

cores:CPU数

memory:内存大小

workerWebUiUrl:Worker Web UI地址

2.等待Master的注册结果,对成功/失败结果进行处理

private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
    masterEndpoint.ask[RegisterWorkerResponse]
(RegisterWorker(workerId, host, port, self, cores, memory, workerWebUiUrl))
      .onComplete {
        // 如果向master注册成功
        case Success(msg) =>
          Utils.tryLogNonFatalError {
            // 调用handleRegisterResponse,处理注册的相应
            handleRegisterResponse(msg)
          }
          // 注册失败抛出异常
        case Failure(e) =>
          logError(s"Cannot register with master: ${masterEndpoint.address}", e)
          System.exit(1)
      }(ThreadUtils.sameThread)
  }

worker向master的注册过程,我们下篇文章进行介绍

这里,只需要了解:在worker向Master进行注册后,Master会给Worker返回一个消息msg,告诉Worker注册的结果是成功 or 失败

master返回消息msg,Worker端通过消息类型,对msg进行处理

如果消息是注册成功则启动forwordMessageScheduler定时器,并开始定期向Master发送心跳包,发送心跳包的同时定期向Master报告Worker中executor的最新信息

如果注册失败,则打印错误日志,并退出

 private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
    msg match {
        // 如果worker注册成功
      case RegisteredWorker(masterRef, masterWebUiUrl) =>
        logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
        registered = true
        changeMaster(masterRef, masterWebUiUrl)
        // forwordMessageScheduler是一个定时执行器,用于定时发送心跳
        // 启动定时器,定时发送心跳包
        forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            // 发送心跳
            self.send(SendHeartbeat)
          }
        }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
        if (CLEANUP_ENABLED) {
          logInfo(
            s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
          forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              self.send(WorkDirCleanup)
            }
          }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
        }

        val execs = executors.values.map { e =>
          new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
        }
        masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))

      case RegisterWorkerFailed(message) =>
        if (!registered) {
          logError("Worker registration failed: " + message)
          System.exit(1)
        }  case MasterInStandby =>
        // Ignore. Master not yet ready.
    }
  }

发送心跳给master后,master会记录最后一次的心跳时间

case Heartbeat(workerId, worker) =>
      idToWorker.get(workerId) match {
        case Some(workerInfo) =>
          //  记录最后一次心跳时间
          workerInfo.lastHeartbeat = System.currentTimeMillis()
        case None =>
          if (workers.map(_.id).contains(workerId)) {
            logWarning(s"Got heartbeat from unregistered worker $workerId." +
              " Asking it to re-register.")
            worker.send(ReconnectWorker(masterUrl))
          } else {
            logWarning(s"Got heartbeat from unregistered worker $workerId." +
              " This worker was never registered, so ignoring the heartbeat.")
          }
      }

并且master内部会一直定时轮询worker的状态

case CheckForWorkerTimeOut =>
      timeOutDeadWorkers()

timeOutDeadWorkers方法是对Worker的状态进行检查、如果worker失联,则将其移除

 // 检查、移除,超时的worker
  private def timeOutDeadWorkers() {
    // Copy the workers into an array so we don't modify the hashset while iterating through it
    // 现在的时间
    val currentTime = System.currentTimeMillis()
    // 对workers的HashSet进行筛选,筛选超出规定时间的worker,默认的时间为60秒
    // 将这些筛选出来的worker添加到一个array中,以便于在迭代过程中不修改HashSet
    val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
    //  将筛选出来的worker移除
    for (worker <- toRemove) {
      //  如果worker的状态为DEAD
      if (worker.state != WorkerState.DEAD) {
        logWarning("Removing %s because we got no heartbeat in %d seconds".format(
          worker.id, WORKER_TIMEOUT_MS / 1000))
        //  移除worker
        removeWorker(worker)
      } else { //如果worker上一次心跳时间小于现在 if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
          workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
        }
      }
    }
  }

至此,整个worker的启动就算完成啦,我们下篇文章见~


亲,看完了点个赞呀!

赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

You may also like...

发表评论

邮箱地址不会被公开。