Spark源码阅读:Driver的注册与启动
系列导读:
想要了解Spark源码的小伙伴们,请按照下面文章的顺序进行阅读哦~
6.Master注册机制-Application的注册与Application的调度算法
8.CoarseGrainedExecutorBackend的启动
10.DAGScheduler Stage划分与Task最佳位置计算
13.Task的启动
14.Task的结果处理
上一篇文章Spark源码阅读: Spark Submit任务提交,介绍了用户如何使用Spark-submit方式提交任务,ClientEndPoint向Master发送RequestSubmitDriver消息,注册Driver
这篇文章来看Master接受到消息后,Driver的注册与启动
来到org.apache.spark.deploy.master.Master.scala
Master接收到RequestSubmitDriver消息后,做了如下几个操作
1.首先判断Master的状态是否为Alive
2.根据发送来的DriverDescription调用createDriver方法,创建driver,返回封装好的DriverInfo
3.使用持久化引擎persistenceEngine,将DriverInfo持久化
4.将driver添加到等待Driver队列中
5.将driver添加到drivers HashSet中
6.调用schedule方法,进行安排
// 请求提交Driver // 参数 DriverDescription case RequestSubmitDriver(description) => // 判断Master状态是否为Alive if (state != RecoveryState.ALIVE) { // 如果不为Alive,则回应提交Driver失败,需要向Alive的Master提交 val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + "Can only accept driver submissions in ALIVE state." context.reply(SubmitDriverResponse(self, false, None, msg)) } else { // 打印日志:已提交Driver logInfo("Driver submitted " + description.command.mainClass) // 使用createDriver方法,创建driver val driver = createDriver(description) // 使用持久化引擎将DriverInfo持久化 persistenceEngine.addDriver(driver) // 将driver添加到等待的waitingDriver队列中 waitingDrivers += driver // 将driver添加到drivers HashSet中 drivers.add(driver) // 执行scheduler,去进行调度 schedule()
CreateDriver方法中将系统时间、driver id、driver desc描述信息封装为一个DriverInfo返回
// 创建Driver,返回封装好的DriverInfo private def createDriver(desc: DriverDescription): DriverInfo = { val now = System.currentTimeMillis() val date = new Date(now) // 将系统当前时间,driver id,desc,日期等信息封装成一个DriverInfo返回 new DriverInfo(now, newDriverId(date), desc, date) }
接下来看一下schedule方法
1.判断当前Master的状态
2.对当前ALIVE状态worker进行shuffle打乱(随机洗牌)workers集合内元素,得到打乱后的worker集合
3.遍历waitingDrivers,给每个driver在打乱后的Alive Workers集合中选出一个Worker,如果选出的worker空闲资源需求大于driver所需资源,则调用launchDriver方法,在该worker上启动driver,在drivers等待队列中移除该driver,并标记该Driver为已启动
4.最后调用startExecutorsOnWorkers
/** * 在等待的应用程序中分配当前可用的资源 * 每当一个新的应用程序加入或资源可用性改变时,这个方法将被调用。 */ private def schedule(): Unit = { // 先来判断,当前master的状态,如果不是Alive,什么都不做 if (state != RecoveryState.ALIVE) { return } // Driver程序比Executor优先 // 对当前ALIVE状态worker进行随机洗牌(shuffle)打乱workers集合内元素,得到打乱后的worker集合 val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) // 获取现在ALIVE的Worker数 val numWorkersAlive = shuffledAliveWorkers.size var curPos = 0 // 遍历一个waitingDrivers集合的复制 // 将worker分配给每个等待的driver // 对于每一个driver,从分配给driver的最后一个worker开始,然后继续向前遍历,直到探索到所有的Alive Worker for (driver <- waitingDrivers.toList) { var launched = false var numWorkersVisited = 0 // 如果Worker Visited数小于Worker Alive数,并且launched为true while (numWorkersVisited < numWorkersAlive && !launched) { // 在打乱后的Alive Workers中选出一个Worker val worker = shuffledAliveWorkers(curPos) numWorkersVisited += 1 // 如果选出的worker空闲资源需求大于driver所需资源,则在该worker上启动driver if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { // 调用launchDriver方法 launchDriver(worker, driver) // 在waitingDrivers等待队列中移除该driver waitingDrivers -= driver // 将launched修改为true launched = true } curPos = (curPos + 1) % numWorkersAlive } } // 调用startExecutorsOnWorkers方法 startExecutorsOnWorkers() }
接下来看一下,launchDriver方法,方法执行步骤如下:
1.在WorkerInfo中添加driver信息
2.在DriverInfo中添加worker信息
3.调用worker RpcEndPointRef向Worker发送LaunchDriver消息,告诉worker启动driver
4.将driver状态更新为RUNNING
// 开始driver private def launchDriver(worker: WorkerInfo, driver: DriverInfo) { // 打印日志,在xx worker上启动driver logInfo("Launching driver " + driver.id + " on worker " + worker.id) // 在WorkerInfo中添加driver信息 worker.addDriver(driver) // 在DriverInfo中添加worker信息 driver.worker = Some(worker) // 调用worker RpcEndPointRef向Worker发送LaunchDriver消息 worker.endpoint.send(LaunchDriver(driver.id, driver.desc)) // driver状态更新为RUNNING driver.state = DriverState.RUNNING }
至此,Driver注册完毕,前往org.apache.spark.deploy.worker.Worker.scala
Worker接收到LaunchDriver消息后
1.将driver信息封装为DriverRunner对象
2.将创建的DriverRunner对象添加到drivers HashMap中保存
3.调用DriverRunner的start方法
4.记录cores使用情况,和memory使用情况
case LaunchDriver(driverId, driverDesc) => logInfo(s"Asked to launch driver $driverId") // 将driver信息封装为DriverRunner对象 val driver = new DriverRunner( conf, driverId, workDir, sparkHome, driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), self, workerUri, securityMgr) // 将创建的DriverRunner对象添加到drivers HashMap中保存 drivers(driverId) = driver // 调用start方法 driver.start() // 记录coresUsed和memoryUsed数据 coresUsed += driverDesc.cores memoryUsed += driverDesc.mem
查看Driver Runner的start方法,该方法中
1.调用prepareAndRunDriver方法,用于准备driver jar包,并运行driver
2.根据prepareAndRunDriver方法的退出码,设置最终状态
3.根据模式匹配,向worker发送DriverStateChanged信息,通知worker driver状态改变
/** * 开启一个线程去运行和管理driver */ private[worker] def start() = { // 启动一个线程 new Thread("DriverRunner for " + driverId) { override def run() { var shutdownHook: AnyRef = null try { shutdownHook = ShutdownHookManager.addShutdownHook { () => logInfo(s"Worker shutting down, killing driver $driverId") kill() } // 准备driver 依赖jar包并且执行driver,该方法返回一个退出码 val exitCode = prepareAndRunDriver() // 根据exitCode,设置finalState,状态分为完成,被杀死,失败 finalState = if (exitCode == 0) { Some(DriverState.FINISHED) } else if (killed) { Some(DriverState.KILLED) } else { Some(DriverState.FAILED) } } catch { case e: Exception => kill() finalState = Some(DriverState.ERROR) finalException = Some(e) } finally { if (shutdownHook != null) { ShutdownHookManager.removeShutdownHook(shutdownHook) } } // 向worker发送DriverStateChanged信息通知Driver发生状态改变 worker.send(DriverStateChanged(driverId, finalState.get, finalException)) } }.start() }
prepareAndRunDriver方法执行步骤如下:
1.创建driver工作目录
2.下载用户jar包到该工作目录
3.匹配参数替代变量substituteVariables
4.创建ProcessBuilder,传入参数为启动driver的执行命令,securityManager,启动driver所需内存,spark home的绝对路径,以及替代的变量
5.调用runDriver方法
private[worker] def prepareAndRunDriver(): Int = { // 创建driver 工作目录 val driverDir = createWorkingDirectory() // 下载用户jar包到工作目录 val localJarFilename = downloadUserJar(driverDir) def substituteVariables(argument: String): String = argument match { case "{{WORKER_URL}}" => workerUrl case "{{USER_JAR}}" => localJarFilename case other => other } // 创建ProcessBuilder val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) runDriver(builder, driverDir, driverDesc.supervise) }
runDriver方法中,重定向将process的InputStream和ErrorStream重定向到指定的stdout、stderr文件中
这就是我们在Spark application web页面中查看Logs对应的两个日志文件
最后调用runCommandWithRetry方法
该方法中,有一个初始化方法initialize,在初始化方法中做了如下步骤:
1.创建stdout、stderr文件
2.将builder命令格式化处理
3.将进程的Error流信息重定向到stderr文件
在runCommandWithRetry方法中最后调用runCommandWithRetry方法,将
private def runDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean): Int = { builder.directory(baseDir) // 初始化 def initialize(process: Process): Unit = { // Redirect stdout and stderr to files // 创建stdout文件 val stdout = new File(baseDir, "stdout") // 将process的InputStream流重定向到stdout文件 CommandUtils.redirectStream(process.getInputStream, stdout) // 创建stderr文件 val stderr = new File(baseDir, "stderr") // 将builder命令格式化处理 val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"") val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40) Files.append(header, stderr, StandardCharsets.UTF_8) // 将process的Error流信息重定向到stderr文件 CommandUtils.redirectStream(process.getErrorStream, stderr) } // 调用runCommandWithRetry runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise) }
runCommandWithRetry是实际在系统中执行命令的操作,该操作返回process的exitCode退出码
private[worker] def runCommandWithRetry( command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Int = { // 默认exitCode为-1 var exitCode = -1 // Time to wait between submission retries. // 在提交重试时等待时间 var waitSeconds = 1 // A run of this many seconds resets the exponential back-off. val successfulRunDuration = 5 var keepTrying = !killed while (keepTrying) { logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\"")) synchronized { // 如果被kill,返回exitCode if (killed) { return exitCode } // 执行命令command,启动 process = Some(command.start()) // 调用初始化方法 initialize(process.get) } val processStart = clock.getTimeMillis() // 获取exitCode exitCode = process.get.waitFor() // check if attempting another run // 检查是否尝试另一次运行 keepTrying = supervise && exitCode != 0 && !killed if (keepTrying) { if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) { waitSeconds = 1 } logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.") sleeper.sleep(waitSeconds) waitSeconds = waitSeconds * 2 // exponential back-off } } // 返回exitCode exitCode } }
在系统中实际执行的命令如下(若有不同,根据实际为准):
这才是真正的提交操作,启动Driver
/opt/jdk1.8/bin/java -Dhdp.version=2.6.0.3-8 -cp /usr/hdp/current/spark2-historyserver/conf/:/usr/hdp/2.6.0.3-8/spark2/jars/*:/usr/hdp/current/hadoop-client/conf/ -Xmx1024M -Dspark.history.kerberos.keytab=none -Dspark.eventLog.enabled=true -Dspark.yarn.historyServer.address=192.168.1.20:18081 -Dspark.jars=file:/usr/hdp/2.6.0.3-8/spark2/bin/../examples/jars/spark-examples_2.11-2.1.0.2.6.0.3-8.jar -Dspark.submit.deployMode=cluster -Dspark.history.ui.port=18081 -Dspark.executor.memory=2G -Dspark.driver.extraLibraryPath=/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64 -Dspark.yarn.queue=default -Dspark.executor.extraLibraryPath=/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64 -Dspark.history.fs.logDirectory=hdfs:///spark2-history/ -Dspark.driver.supervise=false -Dspark.app.name=org.apache.spark.examples.SparkPi -Dspark.history.kerberos.principal=none -Dspark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider -Dspark.cores.max=5 -Dspark.rpc.askTimeout=10s -Dspark.master=spark://192.168.1.20:7077 -Dspark.eventLog.dir=hdfs:///spark2-history/ org.apache.spark.deploy.worker.DriverWrapper spark://Worker@192.168.1.20:40612 /usr/hdp/2.6.0.3-8/spark2/work/driver-20180423114006-0009/spark-examples_2.11-2.1.0.2.6.0.3-8.jar org.apache.spark.examples.SparkPi 1000
移除Driver操作:
DriverRunner根据退出码进行匹配,通知worker driver状态发生改变
worker接受到该消息后,做出如下操作:
1.向Master发送消息driverStateChanged
2.在drivers集合中移除当前driver
3.将当前driver添加到已完成driver集合中
4.移除该driver分配的内存、cpu cores资源
private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = { // driver id val driverId = driverStateChanged.driverId val exception = driverStateChanged.exception val state = driverStateChanged.state // 根据state进行匹配,并打印日志 state match { case DriverState.ERROR => logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}") case DriverState.FAILED => logWarning(s"Driver $driverId exited with failure") case DriverState.FINISHED => logInfo(s"Driver $driverId exited successfully") case DriverState.KILLED => logInfo(s"Driver $driverId was killed by user") case _ => logDebug(s"Driver $driverId changed state to $state") } // 向master发送消息driverStateChanged sendToMaster(driverStateChanged) // 在drivers集合中移除当前driver val driver = drivers.remove(driverId).get // 将当前driver添加到finishedDrivers finishedDrivers(driverId) = driver trimFinishedDriversIfNecessary() // 移除driver内存、cpu cores资源 memoryUsed -= driver.driverDesc.mem coresUsed -= driver.driverDesc.cores }
Master在接受到消息后,根据匹配做出操作,如果为ERROR FINISHED KILLED FAILED状态,则调用removeDriver移除Driver
ase DriverStateChanged(driverId, state, exception) => state match { case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED => removeDriver(driverId, state, exception) case _ => throw new Exception(s"Received unexpected state update for driver $driverId: $state") }
removeDriver方法中,做出一系列移除操作
1.在drivers集合中移除driver
2.将driver添加到completedDrivers集合中
3.持久化引擎移除driver
4.更新driver状态为最终状态时的状态
5.移除worker上的driver
6.调用schedule方法,重新调度资源
private def removeDriver( driverId: String, finalState: DriverState, exception: Option[Exception]) { // 根据该driver id进行模式匹配 drivers.find(d => d.id == driverId) match { case Some(driver) => logInfo(s"Removing driver: $driverId") // 在drivers集合中移除driver drivers -= driver if (completedDrivers.size >= RETAINED_DRIVERS) { val toRemove = math.max(RETAINED_DRIVERS / 10, 1) completedDrivers.trimStart(toRemove) } // 将driver添加到completedDrivers集合中 completedDrivers += driver // 持久化引擎移除driver persistenceEngine.removeDriver(driver) // 更新driver状态为finalState时 driver.state = finalState driver.exception = exception // 移除worker上的driver driver.worker.foreach(w => w.removeDriver(driver)) // 调用schedule方法,做移除操作 schedule() case None => logWarning(s"Asked to remove unknown driver: $driverId") } }
至此,整个Driver的注册和启动过程完成了。
我们下篇文章见~
亲,看完了点个赞呗!
您好,spark难学习吗?想学习一下,但是没有方法和思路
您好,可以加我的QQ:648555416
您好,想请教下,runCommandWithRetry这个方法用什么保证不死循环?
意思是可以有多个driver?