Flink源码阅读(一):Flink的提交方式与入口

现在工作使用Flink框架进行开发,遂阅读一下Flink的相关源码
一些著名的大数据框架,虽然在架构设计上不同,但在其思想,设计过程中,会有相同之处,先尝试使用之前阅读Spark的源码思路,进行阅读Flink源码
若有不同,会进行更改

Spark源码阅读系列可以点这里

那么,今天先来阅读一下Flink如何从命令行提交程序以及提交程序的入口

Flink版本:1.8.0

Flink提交方式

我们使用Flink提交Jar包时,一般有两种方式

1.使用Flink Web UI进行提交

2.使用命令行flink脚本进行提交

我通过脚本方式进行提交,那么先看来提交的脚本

./flink run

flink脚本内容如下

# 获取当前脚本名
target="$0"
iteration=0
# 判断脚本名是否符合规则
while [ -L "$target" ]; do
    if [ "$iteration" -gt 100 ]; then
        echo "Cannot resolve path: You have a cyclic symlink in $target."
        break
    fi
    ls=`ls -ld -- "$target"`
    target=`expr "$ls" : '.* -> \(.*\)$'`
    iteration=$((iteration + 1))
done

# 将相对路径转换为绝对路径
bin=`dirname "$target"`

# 调用config.sh脚本,获取flink配置
. "$bin"/config.sh
# 给FLINK_IDENT_STRING变量赋值,如果为空则为当前用户
if [ "$FLINK_IDENT_STRING" = "" ]; then
        FLINK_IDENT_STRING="$USER"
fi

CC_CLASSPATH=`constructFlinkClassPath`

# 设置日志输出
log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-client-$HOSTNAME.log
log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)

# 添加环境变量
export FLINK_ROOT_DIR
export FLINK_CONF_DIR

# 添加HADOOP_CLASSPATH 以允许使用HDFS  
# 使用exec启用新进程,调用java方法org.apache.flink.client.cli.CliFrontend 
exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@" 

这个脚本不长,其中做了2个主要的操作

1.调用config.sh脚本,这个脚本的主要作用是加载FLINK的ClassPath,加载Flink配置

2.调用exec,调用java类org.apache.flink.client.cli.CliFrontend作为程序的入口,并将命令行的输入作为参数


这是执行flink run脚本后,最后真正执行的命令实例:

exec /opt/jdk1.8.0_211/bin/java -Dlog.file=/opt/flink-1.8.0/log/flink-root-client-louisvv.log -Dlog4j.configuration=file:/opt/flink-1.8.0/conf/log4j-cli.properties -Dlogback.configurationFile=file:/opt/flink-1.8.0/conf/logback.xml -classpath /opt/flink-1.8.0/lib/log4j-1.2.17.jar:/opt/flink-1.8.0/lib/slf4j-log4j12-1.7.15.jar:/opt/flink-1.8.0/lib/flink-dist_2.12-1.8.0.jar::: org.apache.flink.client.cli.CliFrontend run


划重点:

1.加载Flink classpath

-classpath /opt/flink-1.8.0/lib/log4j-1.2.17.jar:/opt/flink-1.8.0/lib/slf4j-log4j12-1.7.15.jar:/opt/flink-1.8.0/lib/flink-dist_2.12-1.8.0.jar:::

2.调用程序入口org.apache.flink.client.cli.CliFrontend类,参数为run

org.apache.flink.client.cli.CliFrontend run


来看程序的入口org.apache.flink.client.cli.CliFrontend类的Main函数,代码如下:

public static void main(final String[] args) {
		EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

		//	获取配置文件目录
		final String configurationDirectory = getConfigurationDirectoryFromEnv();

		//	加载全局配置
		final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);

		//	加载用户输入命令行
		final List<CustomCommandLine<?>> customCommandLines = loadCustomCommandLines(
			configuration,
			configurationDirectory);

		try {
			//	创建CliFrontend对象,调用内部构造方法
			final CliFrontend cli = new CliFrontend(
				configuration,
				customCommandLines);

			//      加载、安装安全模块
			SecurityUtils.install(new SecurityConfiguration(cli.configuration));
                        //      根据命令行参数进行匹配,运行程序,获取程序执行的运行码
			int retCode = SecurityUtils.getInstalledContext()
					.runSecured(() -> cli.parseParameters(args));
			System.exit(retCode);
		}
		catch (Throwable t) {
			final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
			LOG.error("Fatal error while running command line interface.", strippedThrowable);
			strippedThrowable.printStackTrace();
			System.exit(31);
		}
	}

看完了代码,总结一下,主函数做了啥

主函数根据输入参数,提交Job,在提交Job之前做了如下几个操作

1.调用getConfigurationDirectoryFromEnv方法,获取配置文件目录

2.调用GlobalConfiguration的loadConfiguration方法加载全局配置

3.调用loadCustomCommandLines方法加载用户输入命令行参数

4.创建CliFrontend对象,调用内部构造方法,内部构造方法中,有如下几个步骤

(1)用于检查配置、命令行是否为空

(2)初始化文件系统配置

(3)解析命令行命令相关配置参数 ,并添加到执行配置中

(4)设置客户端超时时间

(5)这是默认的并行度

5.调用SecurityUtils.install方法,用于加载安全配置模块

6.根据命令行参数进行Switch case 匹配,执行对应的”动作”,获取“动作”执行后的返回值

7.获取执行返回值,关闭提交程序


由于时间有限,下篇文章将会将主函数中的每个步骤源代码进行阅读

下篇文章见~

赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

You may also like...

发表评论

电子邮件地址不会被公开。