Flink源码阅读(三):Flink Job的提交
继上一篇文章:根据“action”进行模式匹配,如果是run,则执行对应的逻辑
case ACTION_RUN: run(params); return 0;
这篇文章,继续跟着run方法,看看里面有什么洞天
run方法用于执行“run”动作,主要做了以下几个工作:
1.获取配置信息
2.解析命令行参数,如果参数不符合规则,则打印Flink的使用帮助
3.检查用户指定jar包路径是否为空,如果为空,则打印异常信息,并抛出异常:CliArgsException,前面的模式匹配接收到异常后,对异常进行处理,打印出帮助信息
private static int handleArgException(CliArgsException e) { LOG.error("Invalid command line arguments.", e); System.out.println(e.getMessage()); System.out.println(); System.out.println("Use the help option (-h or --help) to get help on the command."); return 1; }
如果2-3步骤中,发生了异常,举了例子,如下图所示
4.创建PackagedProgram对象,org.apache.flink.client.program.PackagedProgram类用于在用户指定的jar包中
(1).寻找程序入口
(2).解析客户代码获取任务拓扑图
(3).提取嵌套库
在这里是去寻找用户的程序入口
(重点)5.调用runProgram方法,将之前准备好的用户程序入口,配置项传入方法中,运行程序
6.删除为了打包所创建的临时文件
protected void run(String[] args) throws Exception { LOG.info("Running 'run' command."); // 配置准备 final Options commandOptions = CliFrontendParser.getRunCommandOptions(); final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions); final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true); final RunOptions runOptions = new RunOptions(commandLine); // evaluate help flag // 解析flink的命令行参数,是否负责规则, 如果不符合规则,则打印帮助信息 if (runOptions.isPrintHelp()) { CliFrontendParser.printHelpForRun(customCommandLines); return; } // 检查用户指定的jar文件路径是否为空,如果为空抛出异常CliArgsException if (runOptions.getJarFilePath() == null) { throw new CliArgsException("The program JAR file was not specified."); } // 创建PackagedProgram对象 final PackagedProgram program; try { LOG.info("Building program from JAR file"); program = buildProgram(runOptions); } catch (FileNotFoundException e) { throw new CliArgsException("Could not build the program from JAR file.", e); } final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine); try { // 调用runProgram方法 runProgram(customCommandLine, commandLine, runOptions, program); } finally { // 最后,删除为了打包所产生的临时文件 program.deleteExtractedLibraries(); } }
下面来看runProgram方法:
代码有些长,总结一下:
1.根据用户命令行参数,创建ClusterDescriptor,ClusterDescriptor是一个集群属性的描述对象,用于部署集群(例如 Yarn、Mesos),并且返回一个与集群通信的客户端
2.获取cluster id
3.创建ClusterClient,ClusterClient封装了提交一个程序到远程集群的必要的功能
4.如果clusterId为空且运行模式为Detached(分离)模式,即命令行中添加 -d 参数
这里提一嘴分离模式,分离模式就是运行后即销毁的运行模式
(1)获取并行度
(2)根据用户程序,解析DAG图,即Job Graph
看到这,在没看Flink DAG解析代码前,先来猜一下,Flink DAG图的解析、提交、与资源分配,或许会与Spark的DAG解析相似
只是flink中没有像spark中的action与transform算子之分,但flink中的触发方式通过env.execute触发计算,类似于spark中的action算子,Flink中的DAG拓扑,会根据算子进行划分
到底是不是相似,后面的文章拭目以待
(3)根据命令行,获取集群配置信息
(4)调用解析好的Job 图,获取的配置信息,运行模式等传入,调用clusterDescriptor.deployJobCluster方法,获得ClusterClient,并运行程序
(5)关闭client
在使用分离模式的时候,会去生成Job Graph,在普通模式下,不会进行Job Graph,关于DAG生成,后面的文章会写
5. 若3条件不满足,则使用正常方式进行提交
(1)创建shutdownHook,用于关闭cluster,在非分离模式下,需要使用shutdownHook在client退出后关闭cluster
(2) 判断cluster id 是否为空,如果为空,获取ClusterClient,如果不为空,在client退出后,关闭cluster
(3) 获取并行度
(4)调用executeProgram方法,执行程序
6.等待程序运行,关闭client,关闭cluster
private <T> void runProgram(CustomCommandLine<T> customCommandLine, CommandLine commandLine, RunOptions runOptions, PackagedProgram program) throws ProgramInvocationException, FlinkException { final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine); try { // 获取cluterId final T clusterId = customCommandLine.getClusterId(commandLine); // 创建ClusterClient,ClusterClient封装了提交一个程序到远程集群的必要的功能 final ClusterClient<T> client; // directly deploy the job if the cluster is started in job mode and detached if (clusterId == null && runOptions.getDetachedMode()) { // 获取默认并行度 int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism(); // 根据用户自定义程序,解析DAG拓扑图 final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism); // 集群配置 final ClusterSpecification clusterSpecification = customCommandLine .getClusterSpecification(commandLine); // client端将job部署到集群 client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, runOptions.getDetachedMode()); logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID()); try { client.shutdown(); } catch (Exception e) { LOG.info("Could not properly shut down the client.", e); } } else { final Thread shutdownHook; if (clusterId != null) { client = clusterDescriptor.retrieve(clusterId); shutdownHook = null; } else { final ClusterSpecification clusterSpecification = customCommandLine .getClusterSpecification(commandLine); client = clusterDescriptor.deploySessionCluster(clusterSpecification); if (!runOptions.getDetachedMode() && runOptions.isShutdownOnAttachedExit()) { shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG); } else { shutdownHook = null; } } try { client.setPrintStatusDuringExecution(runOptions.getStdoutLogging()); client.setDetached(runOptions.getDetachedMode()); LOG.debug("Client slots is set to {}", client.getMaxSlots()); LOG.debug("{}", runOptions.getSavepointRestoreSettings()); int userParallelism = runOptions.getParallelism(); LOG.debug("User parallelism is set to {}", userParallelism); if (client.getMaxSlots() != MAX_SLOTS_UNKNOWN && userParallelism == -1) { logAndSysout("Using the parallelism provided by the remote cluster (" + client.getMaxSlots() + "). " + "To use another parallelism, set it at the ./bin/flink client."); userParallelism = client.getMaxSlots(); } else if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) { userParallelism = defaultParallelism; } // 执行程序 executeProgram(program, client, userParallelism); } finally { if (clusterId == null && !client.isDetached()) { // terminate the cluster only if we have started it before and if it's not // detached try { client.shutDownCluster(); } catch (final Exception e) { LOG.info("Could not properly terminate the Flink cluster.", e); } if (shutdownHook != null) { // we do not need the hook anymore as we have just tried to shutdown the // cluster. ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG); } } try { client.shutdown(); } catch (Exception e) { LOG.info("Could not properly shut down the client.", e); } } } } finally { try { clusterDescriptor.close(); } catch (Exception e) { LOG.info("Could not properly close the cluster descriptor.", e); } } }
看了上面的代码,可以看到,做了一大堆准备工作后,调用executeProgram方法去执行程序
executeProgram中做了两件事:
1.调用ClusterClient的run方法,提交任务逻辑,并执行程序
2.等待提交结果,判断任务是否提交成功
protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException { logAndSysout("Starting execution of program"); // 调用ClusterClient的run方法,执行程序 final JobSubmissionResult result = client.run(program, parallelism); // 等待提交结果 if (null == result) { throw new ProgramMissingJobException( "No JobSubmissionResult returned, please make sure you called " + "ExecutionEnvironment.execute()"); } if (result.isJobExecutionResult()) { logAndSysout("Program execution finished"); JobExecutionResult execResult = result.getJobExecutionResult(); System.out.println("Job with JobID " + execResult.getJobID() + " has finished."); System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms"); Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults(); if (accumulatorsResult.size() > 0) { System.out.println("Accumulator Results: "); System.out.println(AccumulatorHelper.getResultsFormatted(accumulatorsResult)); } } else { logAndSysout("Job has been submitted with JobID " + result.getJobID()); } }
继续,去看ClusterClient的run方法(要看到希望了)
在run方法中会有两种提交程序的模式
1.交互模式
2.非交互模式
在非交互式模式下提交:
(1)创建JobWithJars对象,JobWithJars是flink的数据流计划,包含了jar中所有的类,以及用于加载用户代码的ClassLoader
(2)获取任务计划
(3)调用run的重载方法,进行提交
if (prog.isUsingProgramEntryPoint()) { // JobWithJars是flink的数据流计划,包含了jar中所有的类,以及用于加载用户代码的ClassLoader final JobWithJars jobWithJars; // 获取执行计划 if (hasUserJarsInClassPath(prog.getAllLibraries())) { jobWithJars = prog.getPlanWithoutJars(); } else { jobWithJars = prog.getPlanWithJars(); } // 调用run的重载方法 return run(jobWithJars, parallelism, prog.getSavepointSettings()); }
一般情况下,我们使用的是交互情况进行提交,在交互式提交模式下,只做了一个重要的动作
(1)调用PackagedProgram的invokeInteractiveModeForExecution方法,通过反射去加载用户程序的Main方法
prog.invokeInteractiveModeForExecution();
PackagedProgram在前面介绍过了,用于在用户指定的jar包中
(1).寻找程序入口
(2).解析客户代码获取任务拓扑图
(3).提取嵌套库
它的invokeInteractiveModeForExecution方法很简单,去调用callMainMethod方法,将主函数与参数传入
public void invokeInteractiveModeForExecution() throws ProgramInvocationException{ if (isUsingInteractiveMode()) { callMainMethod(mainClass, args); } else { throw new ProgramInvocationException("Cannot invoke a plan-based program directly."); } }
callMainMethod中,做了两件事:
1.通过用户指定的入口类,获取用户程序的主方法
2.反射调用主方法,将参数传入
private static void callMainMethod(Class<?> entryClass, String[] args) throws ProgramInvocationException { Method mainMethod; if (!Modifier.isPublic(entryClass.getModifiers())) { throw new ProgramInvocationException("The class " + entryClass.getName() + " must be public."); } try { // 通过入口类,获取主方法 mainMethod = entryClass.getMethod("main", String[].class); } catch (NoSuchMethodException e) { throw new ProgramInvocationException("The class " + entryClass.getName() + " has no main(String[]) method."); } catch (Throwable t) { throw new ProgramInvocationException("Could not look up the main(String[]) method from the class " + entryClass.getName() + ": " + t.getMessage(), t); } if (!Modifier.isStatic(mainMethod.getModifiers())) { throw new ProgramInvocationException("The class " + entryClass.getName() + " declares a non-static main method."); } if (!Modifier.isPublic(mainMethod.getModifiers())) { throw new ProgramInvocationException("The class " + entryClass.getName() + " declares a non-public main method."); } try { // 反射调用主方法,将参数传入 mainMethod.invoke(null, (Object) args); } catch (IllegalArgumentException e) { throw new ProgramInvocationException("Could not invoke the main method, arguments are not matching.", e); } catch (IllegalAccessException e) { throw new ProgramInvocationException("Access to the main method was denied: " + e.getMessage(), e); } catch (InvocationTargetException e) { Throwable exceptionInMethod = e.getTargetException(); if (exceptionInMethod instanceof Error) { throw (Error) exceptionInMethod; } else if (exceptionInMethod instanceof ProgramParametrizationException) { throw (ProgramParametrizationException) exceptionInMethod; } else if (exceptionInMethod instanceof ProgramInvocationException) { throw (ProgramInvocationException) exceptionInMethod; } else { throw new ProgramInvocationException("The main method caused an error: " + exceptionInMethod.getMessage(), exceptionInMethod); } } catch (Throwable t) { throw new ProgramInvocationException("An error occurred while invoking the program's main method: " + t.getMessage(), t); } }
至此,下面的逻辑是要通过用户的程序主函数作为入口,进行阅读
整个任务提交的工作,我们阅读完了
下篇文章会以经典的“WordCount”程序举例,继续进行源码阅读
我们下篇文章见~
亲,看完了,点个赞啊!!!
博主怎么不更新flink源码阅读啦?
最近有点忙~
博主怎么不更新flink源码阅读啦?
还是,太太太监了。