Spark源码阅读:Worker的注册


在前一篇文章:Spark源码阅读:Worker的启动中,介绍了在我们通过shell脚本启动worker时,worker的启动流程

其中,在Worker Endpoint的OnStart方法中,Worker向Master发送消息,进行注册

今天来就来看一下Worker的注册

在master源码中,Worker的注册,与Driver的注册、Application注册代码结构类似

Driver的注册:Driver的注册与启动

Application的注册:Application的注册

来看代码:

Worker向Master发送msg:RegisterWorker

Master端通过模式匹配,接收到消息后,做了如下操作:

1.判断Master的状态是否为StandBy,则向Worker回应消息:当前的Master是StandBy的

2.如果Master的状态是Alive

先判断消息体中的WorkerId是否已存在,避免重复注册,如果WorkerId存在,则向Worker回应消息:重复的Worker ID

如果WorkerId不存在,则将消息中的worker id,host,端口,cpu数,内存等信息封装成一个WorkerInfo

3.接下来,调用registerWorker方法(下面会进行阅读),判断Worker是否注册成功

4.如果worker注册成功,使用持久化引擎将WorkerInfo持久化,调用scheduler方法,进行后续调度处理,至此worker注册完成

    如果注册失败,则向worker回应消息:注册失败,worker重复注册

关于Master的scheduler方法,详见:Master注册机制-Application的注册与Application的调度算法

/**
      *   处理Worker的注册请求
      */
    case RegisterWorker(
        id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
      // 判断Master的状态是否为StandBy
      if (state == RecoveryState.STANDBY) {
        // 如果是StandBy状态则回应当前其状态为StandBy
        context.reply(MasterInStandby)
        //  如果为Alive,判断WorkerId是否存在,避免重复注册
      } else if (idToWorker.contains(id)) {
        //  如果WorkerId存在,则回复Worker注册失败“重复的worker id”
        context.reply(RegisterWorkerFailed("Duplicate worker ID"))
      } else {
        //  将worker的id,host,端口,cpu数,内存等信息封装成一个WorkerInfo
        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
          workerRef, workerWebUiUrl)

 //  判断注册Worker注册是否成功
        if (registerWorker(worker)) {
          //  使用持久化引擎将WorkerInfo持久化
          persistenceEngine.addWorker(worker)
          //  context回应,向StandaloneSchedulerBackend的StandaloneAppClient发送消息RegisteredWorker,已注册Worker context.reply(RegisteredWorker(self, masterWebUiUrl))
          //  执行scheduler,去进行调度
          schedule()
          //  Worker注册完成
        } else {
          //  worker重复注册
          val workerAddress = worker.endpoint.address
          logWarning("Worker registration failed. Attempted to re-register worker at same " +
            "address: " + workerAddress)
          //  回应注册失败,worker在同一地址重复注册
          context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
            + workerAddress))
        }
      }

注册过程中,我们看一下重要的registerWorker方法,该方法执行步骤如下:

1.过滤掉状态为DEAD的worker

2.判断worker地址是否存在

如果worker存在,则该worker为old worker,并判断old worker的状态,如果为UNKOWN则移除该worker,如果状态不是UNKOWN,则打印日志Worker重复注册,注册失败,返回false

如果worker不存在,则将新的worker添加到HashSet中,将Worker id,address信息添加到对应的HashMap中

3.如果web ui配置了spark.ui.reverseProxy

则配置代理

4.返回True,即注册成功

 //  worker注册
  private def registerWorker(worker: WorkerInfo): Boolean = {
    //  过滤掉状态为DEAD的worker
    workers.filter { w =>
      (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
    }.foreach { w =>
      //  遍历workers HashSet,移除该worker
      workers -= w
    }

    val workerAddress = worker.endpoint.address
    // 判断worker地址是否已存在
    if (addressToWorker.contains(workerAddress)) {
      // 如果已存在,该worker为oldWorker
      val oldWorker = addressToWorker(workerAddress)
      //  如果oldWorker的状态为UNKOWN,则移除该oldWorker
      if (oldWorker.state == WorkerState.UNKNOWN) {
        removeWorker(oldWorker) 
      } else {
        //  如果状态不是UNKOWN,则worker已存在
        //  打印如下信息:尝试重新注册worker在相同地址:XXXXXX
        logInfo("Attempted to re-register worker at same address: " + workerAddress)
        //  注册worker失败,worker在同一地址重复注册,返回False,worker注册失败
        return false
      }
    }

    //  将新的worker添加到HashSet中去
    workers += worker
    //  将worker id,address信息添加到对应的HashMap中
    idToWorker(worker.id) = worker
    addressToWorker(workerAddress) = worker
    //  如果开启了spark.ui.reverseProxy
    if (reverseProxy) {
      //  添加代理目标
       webUi.addProxyTargets(worker.id, worker.webUiAddress)
    }
    //  返回True,worker注册成功
    true
  } 

总结:

这样设计Worker注册机制有一个很大的好处,在生产环境下,想要把新的Worker 加入到已经运行的Spark 集群上,不需要重新启动Spark 集群,就能够使用新加入的Worker以提升处理性能

Worker 启动后会调用onStart( ) 方法,然后调用 registerWithMaster( ) 来注册给Master

最后将整个Worker注册过程总结如下图:

亲,看完了点个赞呀!!

赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

发表评论

邮箱地址不会被公开。