Flink源码阅读(二):Flink Job提交前准备

上篇文章简单介绍了Flink的提交脚本与入口,大致看了一下org.apache.flink.client.cli.CliFrontend的主函数

主函数根据输入参数,提交Job,在提交Job之前做了如下几个操作

1.调用getConfigurationDirectoryFromEnv方法,获取配置文件目录

2.调用GlobalConfiguration的loadConfiguration方法加载全局配置

3.调用loadCustomCommandLines方法加载用户输入命令行参数

4.创建CliFrontend对象,调用内部构造方法,内部构造方法中,有如下几个步骤

(1)用于检查配置、命令行是否为空

(2)初始化文件系统配置

(3)解析命令行命令相关配置参数 ,并添加到执行配置中

(4)设置客户端超时时间

(5)设置默认的并行度

5.调用SecurityUtils.install方法,用于加载安全配置模块

6.根据命令行参数进行Switch case 匹配,执行对应的”动作”,获取“动作”执行后的返回值

7.获取执行返回值,关闭提交程序

今天这篇文章,深入的去阅读以下在Flink提交任务之前,深入的去阅读一下,每个步骤中做了什么

1.调用getConfigurationDirectoryFromEnv方法,获取配置文件目录

final String configurationDirectory = getConfigurationDirectoryFromEnv();

调用CliFrontend的getConfigurationDirectoryFromEnv静态方法

这个方法很简单,主要是去获取系统的环境变量FLINK_CONF_DIR,该目录是FLink的配置文件目录

在Flink提交脚本中会调用config.sh脚本,该脚本中会获取Flink的配置目录,并添加到系统环境变量中

这里获取到该目录后,判断该目录是否存在,如果存在则返回Flink配置文件目录路径

public static String getConfigurationDirectoryFromEnv() {
		String location = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);

		if (location != null) {
			if (new File(location).exists()) {
				return location;
			}
			else {
				throw new RuntimeException("The configuration directory '" + location + "', specified in the '" +
					ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable, does not exist.");
			}
		}
		else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
			location = CONFIG_DIRECTORY_FALLBACK_1;
		}
		else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
			location = CONFIG_DIRECTORY_FALLBACK_2;
		}
		else {
			throw new RuntimeException("The configuration directory was not specified. " +
					"Please specify the directory containing the configuration file through the '" +
				ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable.");
		}
		return location;
	}

2.调用GlobalConfiguration的loadConfiguration方法加载全局配置

第二步,调用GlobalConfiguration的loadConfiguration静态方法,返回配置

final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);

GlobalConfiguration是Flink的全局配置对象,其实就是Flink配置文件中的(键值对)配置

该方法中,实际调用了loadConfiguration的重载函数

public static Configuration loadConfiguration(final String configDir) {
		return loadConfiguration(configDir, null);
	}

重载函数如下,做了几个操作

1.判断配置目录是否为空,不为空获取配置文件,就是flink的配置文件flink-conf.yaml

2.获取到文件文件后,调用loadYAMLResource方法,去解析yaml配置文件,并返回HashMap键值对形式的Configuration

返回的Configuration实例如下图


public static Configuration loadConfiguration(final String configDir, @Nullable final Configuration dynamicProperties) {

		if (configDir == null) {
			throw new IllegalArgumentException("Given configuration directory is null, cannot load configuration");
		}

		final File confDirFile = new File(configDir);
		if (!(confDirFile.exists())) {
			throw new IllegalConfigurationException(
				"The given configuration directory name '" + configDir +
					"' (" + confDirFile.getAbsolutePath() + ") does not describe an existing directory.");
		}

		// get Flink yaml configuration file
		final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);

		if (!yamlConfigFile.exists()) {
			throw new IllegalConfigurationException(
				"The Flink config file '" + yamlConfigFile +
					"' (" + confDirFile.getAbsolutePath() + ") does not exist.");
		}

		Configuration configuration = loadYAMLResource(yamlConfigFile);

		if (dynamicProperties != null) {
			configuration.addAll(dynamicProperties);
		}

		return configuration;
	}

3.调用loadCustomCommandLines方法加载用户输入命令行参数

调用loadCustomCommandLines方法,去获取命令行参数

final List<CustomCommandLine<?>> customCommandLines = loadCustomCommandLines(
			configuration,
			configurationDirectory);

loadCustomCommandLines方法代码如下:

1.创建一个list

2.判断flink的session方式,先去判断是否为YarnSession模式,如果是则添加到list中,如果不是将默认StandeAlone Session模式添加到List中

其实,这里的flink session判断和Spark启动中判断Backend相似,具体使用哪种模式

3.返回list,list中的对象包含一些属性

(1)zookeeper NameSpace Opiton,用于在Flink高可用模式的可靠性

(2)addressOption,Jobmanager地址配置

(3)flink配置

public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
		List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(2);

		//	Command line interface of the YARN session, with a special initialization here
		//	to prefix all options with y/yarn.
		//	Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
		//	      active CustomCommandLine in order and DefaultCLI isActive always return true.
		final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
		try {
			customCommandLines.add(
				loadCustomCommandLine(flinkYarnSessionCLI,
					configuration,
					configurationDirectory,
					"y",
					"yarn"));
		} catch (NoClassDefFoundError | Exception e) {
			LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
		}

		customCommandLines.add(new DefaultCLI(configuration));

		return customCommandLines;
	}

4.创建CliFrontend对象,调用CliFrontend构造方法

构造执行步骤如下:

1.调用Preconditions的checkNotNull方法

Preconditions这个类是个根据谷歌的Guava Preconditions类进行建模,为了进行代码校验、防止错误的输入,以确保代码符合我们的期望

Flink也是引用了Preconditions,以减少外部依赖

这里调用的checkNotNull方法是检查对象引用是否为空

分别检查configuration 和 customCommandLines 是否为空

2.检查过后,调用FileSystem的initialize初始化方法,将文件系统初始化

Flink通过FileSystem类来抽象自己的文件系统,这个抽象提供了各类文件系统实现的通用操作和最低保证。

其中,initialize为静态方法,用来初始化共享文件系统设置

在flink的配置文件中,也需要我们去指定Filesystem的配置,一般来说使用的是HDFS

在初始化方法中

(1)遍历传递的configuration ,获取Filesystem的配置,并通过工厂加载

(2)如果获取到配置,则加载默认的文件系统

(3)获取默认文件系统的scheme

3.获取用户命令行配置customCommandLines,遍历list将其添加到运行配置和一般配置中

4.获取客户端超时时间,默认60000毫秒

5.获取默认并行度,默认并行度为1

Flink通过FileSystem代码如下:

public static void initialize(Configuration config) throws IOException, IllegalConfigurationException {
		LOCK.lock();
		try {
			// make sure file systems are re-instantiated after re-configuration
			CACHE.clear();
			FS_FACTORIES.clear();

			// configure all file system factories
			for (FileSystemFactory factory : RAW_FACTORIES) {
				factory.configure(config);
				String scheme = factory.getScheme();

				FileSystemFactory fsf = ConnectionLimitingFactory.decorateIfLimited(factory, scheme, config);
				FS_FACTORIES.put(scheme, fsf);
			}

			// configure the default (fallback) factory
			FALLBACK_FACTORY.configure(config);

			// also read the default file system scheme
			final String stringifiedUri = config.getString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, null);
			if (stringifiedUri == null) {
				defaultScheme = null;
			}
			else {
				try {
					defaultScheme = new URI(stringifiedUri);
				}
				catch (URISyntaxException e) {
					throw new IllegalConfigurationException("The default file system scheme ('" +
							CoreOptions.DEFAULT_FILESYSTEM_SCHEME + "') is invalid: " + stringifiedUri, e);
				}
			}
		}
		finally {
			LOCK.unlock();
		}
	}

CliFrontend构造方法代码如下:

public CliFrontend(
			Configuration configuration,
			List<CustomCommandLine<?>> customCommandLines) throws Exception {
		this.configuration = Preconditions.checkNotNull(configuration);
		this.customCommandLines = Preconditions.checkNotNull(customCommandLines);

		try {
			FileSystem.initialize(this.configuration);
		} catch (IOException e) {
			throw new Exception("Error while setting the default " +
				"filesystem scheme from configuration.", e);
		}

		this.customCommandLineOptions = new Options();

		for (CustomCommandLine<?> customCommandLine : customCommandLines) {
			customCommandLine.addGeneralOptions(customCommandLineOptions);
			customCommandLine.addRunOptions(customCommandLineOptions);
		}

		this.clientTimeout = AkkaUtils.getClientTimeout(this.configuration);
		this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
	}

5.调用SecurityUtils.install方法,用于加载安全配置模块

org.apache.flink.runtime.security.SecurityConfiguration类,是flink全局安全配置,这里的代码暂时略过

6.根据命令行参数进行Switch case 匹配,执行对应的”动作”,获取“动作”执行后的返回值

调用CliFrontend的parseParameters方法, 解析命令行参数,进行模式匹配

int retCode = SecurityUtils.getInstalledContext()
					.runSecured(() -> cli.parseParameters(args));

parseParameters方法代码如下:

根据“action”进行匹配,我这里的action为run,匹配后,调用run方法执行程序(重点)

如果匹配正常返回0,匹配异常返回1

public int parseParameters(String[] args) {

		// check for action
		if (args.length < 1) {
			CliFrontendParser.printHelp(customCommandLines);
			System.out.println("Please specify an action.");
			return 1;
		}

		// get action
		String action = args[0];

		// remove action from parameters
		final String[] params = Arrays.copyOfRange(args, 1, args.length);

		try {
			// do action
			switch (action) {
				case ACTION_RUN:
					run(params);
					return 0;
				case ACTION_LIST:
					list(params);
					return 0;
				case ACTION_INFO:
					info(params);
					return 0;
				case ACTION_CANCEL:
					cancel(params);
					return 0;
				case ACTION_STOP:
					stop(params);
					return 0;
				case ACTION_SAVEPOINT:
					savepoint(params);
					return 0;
				case ACTION_MODIFY:
					modify(params);
					return 0;
				case "-h":
				case "--help":
					CliFrontendParser.printHelp(customCommandLines);
					return 0;
				case "-v":
				case "--version":
					String version = EnvironmentInformation.getVersion();
					String commitID = EnvironmentInformation.getRevisionInformation().commitId;
					System.out.print("Version: " + version);
					System.out.println(commitID.equals(EnvironmentInformation.UNKNOWN) ? "" : ", Commit ID: " + commitID);
					return 0;
				default:
					System.out.printf("\"%s\" is not a valid action.\n", action);
					System.out.println();
					System.out.println("Valid actions are \"run\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");
					System.out.println();
					System.out.println("Specify the version option (-v or --version) to print Flink version.");
					System.out.println();
					System.out.println("Specify the help option (-h or --help) to get help on the command.");
					return 1;
			}
		} catch (CliArgsException ce) {
			return handleArgException(ce);
		} catch (ProgramParametrizationException ppe) {
			return handleParametrizationException(ppe);
		} catch (ProgramMissingJobException pmje) {
			return handleMissingJobException();
		} catch (Exception e) {
			return handleError(e);
		}
	}

7.根据步骤6,获取执行返回值,关闭client

至此,程序提交部分源码阅读完毕,下篇文章会对run方法进行阅读,我们下篇文章见~


亲,看完了点个赞啊!!

赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

You may also like...

发表评论

电子邮件地址不会被公开。