Flink源码阅读(三):Flink Job的提交

继上一篇文章:根据“action”进行模式匹配,如果是run,则执行对应的逻辑

case ACTION_RUN:
    run(params);
    return 0;

这篇文章,继续跟着run方法,看看里面有什么洞天

run方法用于执行“run”动作,主要做了以下几个工作:

1.获取配置信息

2.解析命令行参数,如果参数不符合规则,则打印Flink的使用帮助

3.检查用户指定jar包路径是否为空,如果为空,则打印异常信息,并抛出异常:CliArgsException,前面的模式匹配接收到异常后,对异常进行处理,打印出帮助信息

private static int handleArgException(CliArgsException e) {
		LOG.error("Invalid command line arguments.", e);
		System.out.println(e.getMessage());
		System.out.println();
		System.out.println("Use the help option (-h or --help) to get help on the command.");
		return 1;
	}

如果2-3步骤中,发生了异常,举了例子,如下图所示


4.创建PackagedProgram对象,org.apache.flink.client.program.PackagedProgram类用于在用户指定的jar包中

(1).寻找程序入口

(2).解析客户代码获取任务拓扑图

(3).提取嵌套库

在这里是去寻找用户的程序入口

(重点)5.调用runProgram方法,将之前准备好的用户程序入口,配置项传入方法中,运行程序

6.删除为了打包所创建的临时文件

protected void run(String[] args) throws Exception {
		LOG.info("Running 'run' command.");
		//	配置准备
		final Options commandOptions = CliFrontendParser.getRunCommandOptions();

		final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions);

		final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true);

		final RunOptions runOptions = new RunOptions(commandLine);

		// evaluate help flag
		//	解析flink的命令行参数,是否负责规则, 如果不符合规则,则打印帮助信息
		if (runOptions.isPrintHelp()) {
			CliFrontendParser.printHelpForRun(customCommandLines);
			return;
		}
		// 检查用户指定的jar文件路径是否为空,如果为空抛出异常CliArgsException
		if (runOptions.getJarFilePath() == null) {
			throw new CliArgsException("The program JAR file was not specified.");
		}
		//	创建PackagedProgram对象
		final PackagedProgram program;
		try {
			LOG.info("Building program from JAR file");
			program = buildProgram(runOptions);
		} catch (FileNotFoundException e) {
			throw new CliArgsException("Could not build the program from JAR file.", e);
		}

		final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine);

		try {
		//	调用runProgram方法
			runProgram(customCommandLine, commandLine, runOptions, program);
		} finally {
		//	最后,删除为了打包所产生的临时文件
			program.deleteExtractedLibraries();
		}
	}

下面来看runProgram方法:

代码有些长,总结一下:

1.根据用户命令行参数,创建ClusterDescriptor,ClusterDescriptor是一个集群属性的描述对象,用于部署集群(例如 Yarn、Mesos),并且返回一个与集群通信的客户端

2.获取cluster id

3.创建ClusterClient,ClusterClient封装了提交一个程序到远程集群的必要的功能

4.如果clusterId为空且运行模式为Detached(分离)模式,即命令行中添加 -d 参数

这里提一嘴分离模式,分离模式就是运行后即销毁的运行模式

(1)获取并行度

(2)根据用户程序,解析DAG图,即Job Graph

看到这,在没看Flink DAG解析代码前,先来猜一下,Flink DAG图的解析、提交、与资源分配,或许会与Spark的DAG解析相似

只是flink中没有像spark中的action与transform算子之分,但flink中的触发方式通过env.execute触发计算,类似于spark中的action算子,Flink中的DAG拓扑,会根据算子进行划分

到底是不是相似,后面的文章拭目以待

(3)根据命令行,获取集群配置信息

(4)调用解析好的Job 图,获取的配置信息,运行模式等传入,调用clusterDescriptor.deployJobCluster方法,获得ClusterClient,并运行程序

(5)关闭client

在使用分离模式的时候,会去生成Job Graph,在普通模式下,不会进行Job Graph,关于DAG生成,后面的文章会写

5. 若3条件不满足,则使用正常方式进行提交

(1)创建shutdownHook,用于关闭cluster,在非分离模式下,需要使用shutdownHook在client退出后关闭cluster

  (2)  判断cluster id 是否为空,如果为空,获取ClusterClient,如果不为空,在client退出后,关闭cluster

  (3)  获取并行度

(4)调用executeProgram方法,执行程序

6.等待程序运行,关闭client,关闭cluster

	private <T> void runProgram(CustomCommandLine<T> customCommandLine, CommandLine commandLine, RunOptions runOptions,
			PackagedProgram program) throws ProgramInvocationException, FlinkException {
		final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);

		try {
			//	获取cluterId
			final T clusterId = customCommandLine.getClusterId(commandLine);
			//	创建ClusterClient,ClusterClient封装了提交一个程序到远程集群的必要的功能
			final ClusterClient<T> client;

			// directly deploy the job if the cluster is started in job mode and detached
			if (clusterId == null && runOptions.getDetachedMode()) {
				// 获取默认并行度
				int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism();
				// 根据用户自定义程序,解析DAG拓扑图
				final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
				//	集群配置
				final ClusterSpecification clusterSpecification = customCommandLine
						.getClusterSpecification(commandLine);
				//	client端将job部署到集群
				client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph,
						runOptions.getDetachedMode());

				logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID());

				try {
					client.shutdown();
				} catch (Exception e) {
					LOG.info("Could not properly shut down the client.", e);
				}
			} else {
				final Thread shutdownHook;
				if (clusterId != null) {
					client = clusterDescriptor.retrieve(clusterId);
					shutdownHook = null;
				} else {
					final ClusterSpecification clusterSpecification = customCommandLine
							.getClusterSpecification(commandLine);
					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
					
					if (!runOptions.getDetachedMode() && runOptions.isShutdownOnAttachedExit()) {
						shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster,
								client.getClass().getSimpleName(), LOG);
					} else {
						shutdownHook = null;
					}
				}

				try {
					client.setPrintStatusDuringExecution(runOptions.getStdoutLogging());
					client.setDetached(runOptions.getDetachedMode());
					LOG.debug("Client slots is set to {}", client.getMaxSlots());

					LOG.debug("{}", runOptions.getSavepointRestoreSettings());

					int userParallelism = runOptions.getParallelism();
					LOG.debug("User parallelism is set to {}", userParallelism);
					if (client.getMaxSlots() != MAX_SLOTS_UNKNOWN && userParallelism == -1) {
						logAndSysout("Using the parallelism provided by the remote cluster (" + client.getMaxSlots()
								+ "). " + "To use another parallelism, set it at the ./bin/flink client.");
						userParallelism = client.getMaxSlots();
					} else if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) {
						userParallelism = defaultParallelism;
					}
                                        //   执行程序
					executeProgram(program, client, userParallelism);
				} finally {
					if (clusterId == null && !client.isDetached()) {
						// terminate the cluster only if we have started it before and if it's not
						// detached
						try {
							client.shutDownCluster();
						} catch (final Exception e) {
							LOG.info("Could not properly terminate the Flink cluster.", e);
						}
						if (shutdownHook != null) {
							// we do not need the hook anymore as we have just tried to shutdown the
							// cluster.
							ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
						}
					}
					try {
						client.shutdown();
					} catch (Exception e) {
						LOG.info("Could not properly shut down the client.", e);
					}
				}
			}
		} finally {
			try {
				clusterDescriptor.close();
			} catch (Exception e) {
				LOG.info("Could not properly close the cluster descriptor.", e);
			}
		}
	}

看了上面的代码,可以看到,做了一大堆准备工作后,调用executeProgram方法去执行程序

executeProgram中做了两件事:

1.调用ClusterClient的run方法,提交任务逻辑,并执行程序

2.等待提交结果,判断任务是否提交成功

protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism)
			throws ProgramMissingJobException, ProgramInvocationException {
		logAndSysout("Starting execution of program");
                //    调用ClusterClient的run方法,执行程序
		final JobSubmissionResult result = client.run(program, parallelism);
                //    等待提交结果
		if (null == result) {
			throw new ProgramMissingJobException(
					"No JobSubmissionResult returned, please make sure you called " + "ExecutionEnvironment.execute()");
		}

		if (result.isJobExecutionResult()) {
			logAndSysout("Program execution finished");
			JobExecutionResult execResult = result.getJobExecutionResult();
			System.out.println("Job with JobID " + execResult.getJobID() + " has finished.");
			System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms");
			Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
			if (accumulatorsResult.size() > 0) {
				System.out.println("Accumulator Results: ");
				System.out.println(AccumulatorHelper.getResultsFormatted(accumulatorsResult));
			}
		} else {
			logAndSysout("Job has been submitted with JobID " + result.getJobID());
		}
	}

继续,去看ClusterClient的run方法(要看到希望了

在run方法中会有两种提交程序的模式

1.交互模式

2.非交互模式

在非交互式模式下提交:

(1)创建JobWithJars对象,JobWithJars是flink的数据流计划,包含了jar中所有的类,以及用于加载用户代码的ClassLoader

(2)获取任务计划

(3)调用run的重载方法,进行提交

if (prog.isUsingProgramEntryPoint()) {
			//	JobWithJars是flink的数据流计划,包含了jar中所有的类,以及用于加载用户代码的ClassLoader
			final JobWithJars jobWithJars;
			//	获取执行计划
			if (hasUserJarsInClassPath(prog.getAllLibraries())) {
				jobWithJars = prog.getPlanWithoutJars();
			} else {
				jobWithJars = prog.getPlanWithJars();
			}
			//	调用run的重载方法
			return run(jobWithJars, parallelism, prog.getSavepointSettings());
		}

一般情况下,我们使用的是交互情况进行提交,在交互式提交模式下,只做了一个重要的动作

(1)调用PackagedProgram的invokeInteractiveModeForExecution方法,通过反射去加载用户程序的Main方法

prog.invokeInteractiveModeForExecution();

PackagedProgram在前面介绍过了,用于在用户指定的jar包中

(1).寻找程序入口

(2).解析客户代码获取任务拓扑图

(3).提取嵌套库

它的invokeInteractiveModeForExecution方法很简单,去调用callMainMethod方法,将主函数与参数传入

public void invokeInteractiveModeForExecution() throws ProgramInvocationException{
		if (isUsingInteractiveMode()) {
			callMainMethod(mainClass, args);
		} else {
			throw new ProgramInvocationException("Cannot invoke a plan-based program directly.");
		}
	}

callMainMethod中,做了两件事:

1.通过用户指定的入口类,获取用户程序的主方法

2.反射调用主方法,将参数传入

private static void callMainMethod(Class<?> entryClass, String[] args) throws ProgramInvocationException {
		Method mainMethod;
		if (!Modifier.isPublic(entryClass.getModifiers())) {
			throw new ProgramInvocationException("The class " + entryClass.getName() + " must be public.");
		}

		try {
			//	通过入口类,获取主方法
			mainMethod = entryClass.getMethod("main", String[].class);
		} catch (NoSuchMethodException e) {
			throw new ProgramInvocationException("The class " + entryClass.getName() + " has no main(String[]) method.");
		}
		catch (Throwable t) {
			throw new ProgramInvocationException("Could not look up the main(String[]) method from the class " +
					entryClass.getName() + ": " + t.getMessage(), t);
		}

		if (!Modifier.isStatic(mainMethod.getModifiers())) {
			throw new ProgramInvocationException("The class " + entryClass.getName() + " declares a non-static main method.");
		}
		if (!Modifier.isPublic(mainMethod.getModifiers())) {
			throw new ProgramInvocationException("The class " + entryClass.getName() + " declares a non-public main method.");
		}

		try {
			//	反射调用主方法,将参数传入
			mainMethod.invoke(null, (Object) args);
		}
		catch (IllegalArgumentException e) {
			throw new ProgramInvocationException("Could not invoke the main method, arguments are not matching.", e);
		}
		catch (IllegalAccessException e) {
			throw new ProgramInvocationException("Access to the main method was denied: " + e.getMessage(), e);
		}
		catch (InvocationTargetException e) {
			Throwable exceptionInMethod = e.getTargetException();
			if (exceptionInMethod instanceof Error) {
				throw (Error) exceptionInMethod;
			} else if (exceptionInMethod instanceof ProgramParametrizationException) {
				throw (ProgramParametrizationException) exceptionInMethod;
			} else if (exceptionInMethod instanceof ProgramInvocationException) {
				throw (ProgramInvocationException) exceptionInMethod;
			} else {
				throw new ProgramInvocationException("The main method caused an error: " + exceptionInMethod.getMessage(), exceptionInMethod);
			}
		}
		catch (Throwable t) {
			throw new ProgramInvocationException("An error occurred while invoking the program's main method: " + t.getMessage(), t);
		}
	}

至此,下面的逻辑是要通过用户的程序主函数作为入口,进行阅读

整个任务提交的工作,我们阅读完了

下篇文章会以经典的“WordCount”程序举例,继续进行源码阅读

我们下篇文章见~


亲,看完了,点个赞啊!!!

赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

You may also like...

3 Responses

  1. 匿名说道:

    博主怎么不更新flink源码阅读啦?

  2. 匿名2号说道:

    博主怎么不更新flink源码阅读啦?

发表评论

电子邮件地址不会被公开。