Spark源码阅读:Application的注册
系列导读:
想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~
6.Master注册机制-Application的注册与Application的调度算法
8.CoarseGrainedExecutorBackend的启动
10.DAGScheduler Stage划分与Task最佳位置计算
13.Task的启动
14.Task的结果处理
上一篇文章介绍了TaskScheduler的初始化,这篇文章继续来看StandaloneSchedulerBackend中Spark Application的注册与提交
StandaloneSchedulerBackend作用于Driver内,用于Standalone模式下资源管理器、及Executor通信
StandaloneSchedulerBackend的Start方法中,除了调用父类CoarseGrainedSchedulerBackend的start方法创建DriverEndPoint以外
主要是创建AppClient,AppClient会向Master注册Application,Master通过Application信息为它分配Worker
在创建AppClient之前,还有一些准备过程,获取一些必要参数
// 获取Driver RpcEndpoint地址 val driverUrl = RpcEndpointAddress( sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString // 启动参数 val args = Seq( "--driver-url", driverUrl, "--executor-id", "{{EXECUTOR_ID}}", "--hostname", "{{HOSTNAME}}", "--cores", "{{CORES}}", "--app-id", "{{APP_ID}}", "--worker-url", "{{WORKER_URL}}") // executor额外java选项 val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") .map(Utils.splitCommandString).getOrElse(Seq.empty) // executor额外环境变量 val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath") .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) // executor额外依赖路径 val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath") .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
// spark java 参数 val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf) // 整合java 参数 val javaOpts = sparkJavaOpts ++ extraJavaOpts
// 将获取的信息封装为Command val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) // 获取application UI地址 val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") // 每个executor core数 val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
将上面准备参数和sparkContext相关信息,封装为ApplicationDescription对象
传入ApplicationDescription,创建Standalone AppClient,在创建StandaloneAppClient对象时,会进行StandaloneAppClient的初始化,下面会有讲解
// 创建ApplicationDescription val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit) // 创建Standalone AppClient,并发送ApplicationDescription,AppClient初始化 client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf) // 创建AppClient后,调用AppClient start方法,向listener返回消息 client.start() launcherBackend.setState(SparkAppHandle.State.SUBMITTED) // 等待注册状态更新 waitForRegistration() launcherBackend.setState(SparkAppHandle.State.RUNNING) }
发送的ApplicationDescription信息格式如下:
private[spark] case class ApplicationDescription( name: String, //application名字,可以通过spark.app.name进行设置,也可以通过代码中setAppName()进行设置 maxCores: Option[Int], //最多可以使用Cpu Cores数量 memoryPerExecutorMB: Int, //每个Executor使用的内存 command: Command, //启动命令 appUiUrl: String, //application的 UI Url地址 eventLogDir: Option[URI] = None, //event日志目录 // 编写事件日志时使用的压缩编解码器的简称,如果有配置的话(例如lzf) eventLogCodec: Option[String] = None, coresPerExecutor: Option[Int] = None, // 这个Application想要启动的executor数,只有在动态分配启用时使用 initialExecutorLimit: Option[Int] = None, user: String = System.getProperty("user.name", "<unknown>")) { override def toString: String = "ApplicationDescription(" + name + ")" }
创建AppClient,传入参数rpcEnv,masters,appDesc, this, conf,tihs
这里就要说一下这个this,对应的参数类型为listener
StandaloneAppClientListener是一个Trait,为了确保AppClient在各种时间发生时,及时通知StandaloneSchedulerBackend某些状态的更新
在StandaloneSchedulerBackend创建了AppClient时,需要一个listener,会将自身作为一个参数传递给AppClient,就是这个this。
/** * 当各种事件发生时,能够及时通知StandaloneSchedulerBackend的回调 * 目前有四个事件回调函数 * 1.成功连接 2.断开连接 3.添加一个executor 4.移除一个executor */ private[spark] trait StandaloneAppClientListener { // 向master成功注册application,成功连接到集群 def connected(appId: String): Unit // 断开连接 // 断开连接可能是临时的,可能是原master故障,等待master切换后,会恢复连接 def disconnected(): Unit // application由于不可恢复的错误停止 def dead(reason: String): Unit // 添加一个executor def executorAdded( fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit // 移除一个executor def executorRemoved( fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit }
在创建StandaloneAppClient时,需要执行OnStart初始化方法,初始化中调用registerWithMaster,向master注册
override def onStart(): Unit = { try { //向master注册 registerWithMaster(1) } catch { case e: Exception => logWarning("Failed to connect to master", e) markDisconnected() stop() } }
registerWithMaster方法,实际上是调用了tryRegisterAllMasters,向所有的Master进行注册,在spark中,Master可能是高可靠的(HA),会存在多个Master,所以这里向所有的Master进行注册,但最终只有Active Master响应
/** 向所有的master进行异步注册,将会一直调用registerWithMaster进行注册,直到超过注册时间 * 一旦成功连接到master,所有调度的工作和Futures都会被取消 */ private def registerWithMaster(nthRetry: Int) { // 实际上调用了tryRegisterAllMasters,向所有的master进行注册 registerMasterFutures.set(tryRegisterAllMasters()) registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable { override def run(): Unit = { if (registered.get) { registerMasterFutures.get.foreach(_.cancel(true)) registerMasterThreadPool.shutdownNow() } else if (nthRetry >= REGISTRATION_RETRIES) { markDead("All masters are unresponsive! Giving up.") } else { registerMasterFutures.get.foreach(_.cancel(true)) registerWithMaster(nthRetry + 1) } } }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)) }
tryRegisterAllMasters方法,向所有master注册
/** * 异步的向master注册,返回一个[Future]数组用来以后取消 */ private def tryRegisterAllMasters(): Array[JFuture[_]] = { for (masterAddress <- masterRpcAddresses) yield { registerMasterThreadPool.submit(new Runnable { override def run(): Unit = try { if (registered.get) { return } logInfo("Connecting to master " + masterAddress.toSparkURL + "...") val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) // 向master注册application masterRef.send(RegisterApplication(appDescription, self)) } catch { case ie: InterruptedException => // Cancelled case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) } }) } }
向Master发出注册application请求后,接收到传过来的ApplicationDescription,将其封装为ApplicationInfo,调用registerApplication注册application
注册成后向StandaloneSchedulerBackend的AppClient发送消息,Application注册成功,listener接受到消息后,状态改变为connected
// driver向StandaloneSchedulerBackend的StandaloneAppClient发送消息RegisteredApplication,已注册Application driver.send(RegisteredApplication(app.id, self))
最后调用schedule方法进行调度,该方法内部先调用launchDriver向Worker注册并启动driver,然后allocateWorkerResourceToExecutors调用为executor分配资源,接着调用startExecutorsOnWorkers在worker上启动executor,最后再去executor上执行Task。
如果有对application注册等过程不了解的小伙伴们,可以查看之前的文章进行了解:
Spark源码阅读:Master注册机制原理-Application注册
Spark源码阅读:Master注册机制原理-Worker注册与Driver注册