Flink WaterMark实例

Hi~

上篇文章介绍了时间窗口,Flink 时间窗口,Flink WaterMark,感兴趣请查看:Flink WaterMark简介

结合上篇,本篇文章主要是通过代码实例的方式来展示如何设置WaterMark

WaterMark的设定

由于种种原因,造成数据的乱序与延迟,在设置WaterMark时可以允许一定时间段的延迟(当然也不可能无限的等待),且在触发下一个窗口计算前,也会将Event Time进行排序,以保证数据有序

WaterMark设定方法有两种:

  • Punctuated Watermark : 数据流中每一个递增的EventTime都会产生一个Watermark

        在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成

  • Periodic Watermark : 周期性的(允许一定时间间隔或者达到一定的记录条数)产生一个Watermark

        在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时

两种方式根据场景进行选择

设置WaterMark步骤

设置WaterMark步骤:

1.设置StreamTime Characteristic为Event Time,即设置流式时间窗口(也可以称为流式时间特性)

2.创建的DataStreamSource调用assignTimestampsAndWatermarks方法,并设置WaterMark种类:AssignerWithPeriodicWatermarks / AssignerWithPunctuatedWatermarks

或者 实现AssignerWithPeriodicWatermarks接口 / 实现AssignerWithPunctuatedWatermarks接口

3.重写getCurrentWatermark与extractTimestamp方法

getCurrentWatermark方法:获取当前的水位线

extractTimestamp方法:提取数据流中的时间戳(必须显式的指定数据中的Event Time

实例

通过一段程序,实践一下WaterMark的设定以及WaterMark的工作方式

数据示例:

key + 时间戳

hello,1553503210000 

程序说明:

1.使用Socket模拟接收数据

2.设置WaterMark

设置的逻辑:在第一条数据进来时,设置WaterMark为0,指定第一条数据的时间戳后,获取该时间戳与当前 WaterMark的最大值,并将最大值设置为下一条数据的WaterMark,以此类推

3.进行map基础转换,将String转换为Tuple2<String,String>

4.根据Key分组

5.使用滚动Event Time窗口,将5秒内的同组数据,进行Fold拼接输出

代码如下:

  StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<String> dataStream = env
                .socketTextStream("10.47.1.38", 9999).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() {
                    long currentTimeStamp = 0l;
                    long maxDelayAllowed = 0l;
                    long currentWaterMark;

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        currentWaterMark = currentTimeStamp - maxDelayAllowed;
                        return new Watermark(currentWaterMark);
                    }

                    @Override
                    public long extractTimestamp(String s, long l) {
                        String[] arr = s.split(",");
                        long timeStamp = Long.parseLong(arr[1]);
                        currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
                        System.out.println("Key:" + arr[0] + ",EventTime:" + timeStamp + ",水位线:" + currentWaterMark);
                        return timeStamp;
                    }
                });

        dataStream.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String s) throws Exception {
                return new Tuple2<String, String>(s.split(",")[0], s.split(",")[1]);
            }
        }).keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .fold("Start:", new FoldFunction<Tuple2<String, String>, String>() {
                    @Override
                    public String fold(String s, Tuple2<String, String> o) throws Exception {
                        return s + " - " + o.f1;
                    }
                }).print();

        env.execute("WaterMark Test Demo");

开启9999端口,并输入第一条数据:

hello,1553503185000

那么,我先假设后续的数据Event Time间隔为1秒,推断一下WaterMark的设定,如下图所示

1.第一条数据的Event Time为1553503185000,那么当前窗口时间为:1553503185000 -> 1553503189000,即下图中红色框线

2.第一条数据进来时,这条数据之前的WaterMark为0,当第一条数据已经进入后,指定Event Time位置,并与现在的WaterMark比较,将两者中大的那个值设置为新的WaterMark,那么当前数据的WaterMark为1553503185000

3.第二条数据进来时,前一条数据的WaterMark为1553503185000,第二条数据的Event Time比之前的WaterMark大,于是更新WaterMark,将当前的WaterMark更新为1553503186000,但还没到窗口触发时间,不进行计算

4.后面几个以此类推,直到Event Time为:1553503190000的数据进来的时候,前一条数据的WaterMark为1553503189000,于是更新当前的WaterMark为155350390000,Flink认为1553503190000之前的数据都已经到达,且达到了窗口的触发条件,开始进行计算


根据上面的推断,启动程序验证一下

先启动监听9999端口,再启动Flink程序,并向端口监听终端输入以下内容:

hello,1553503185000
hello,1553503186000
hello,1553503187000
hello,1553503188000
hello,1553503189000
hello,1553503190000

Flink输出结果:

Key:hello,EventTime:1553503185000,水位线:0
Key:hello,EventTime:1553503186000,水位线:1553503185000
Key:hello,EventTime:1553503187000,水位线:1553503186000
Key:hello,EventTime:1553503188000,水位线:1553503187000
Key:hello,EventTime:1553503189000,水位线:1553503188000
Key:hello,EventTime:1553503190000,水位线:1553503189000
2> Start: - 1553503185000 - 1553503186000 - 1553503187000 - 1553503188000 - 1553503189000

通过结果可以发现,Flink在指定WaterMark时,先调用extractTimestamp方法,再调用getCurrentWatermark方法, 所以打印信息中的WaterMark为上一条数据的WaterMark,并非当前的WaterMark

为了验证这个结论,修改一下代码:

 @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        currentWaterMark = currentTimeStamp - maxDelayAllowed;
                        System.out.println("当前水位线:" + currentWaterMark);
                        return new Watermark(currentWaterMark);
                    }

                    @Override
                    public long extractTimestamp(String s, long l) {
                        String[] arr = s.split(",");
                        long timeStamp = Long.parseLong(arr[1]);
                        currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
                        System.out.println("Key:" + arr[0] + ",EventTime:" + timeStamp + ",前一条数据的水位线:" + currentWaterMark);
                        return timeStamp;
                    }

在监听终端输入同一批数据:

hello,1553503185000
hello,1553503186000
hello,1553503187000
hello,1553503188000
hello,1553503189000
hello,1553503190000

Flink输出结果:

Key:hello,EventTime:1553503185000,前一条数据的水位线:0
当前水位线:1553503185000

Key:hello,EventTime:1553503186000,前一条数据的水位线:1553503185000
当前水位线:1553503186000

Key:hello,EventTime:1553503187000,前一条数据的水位线:1553503186000
当前水位线:1553503187000

Key:hello,EventTime:1553503188000,前一条数据的水位线:1553503187000
当前水位线:1553503188000

Key:hello,EventTime:1553503189000,前一条数据的水位线:1553503188000
当前水位线:1553503189000

Key:hello,EventTime:1553503190000,前一条数据的水位线:1553503189000
当前水位线:1553503190000
2> Start: - 1553503185000 - 1553503186000 - 1553503187000 - 1553503188000 - 1553503189000

通过上面的结果,验证了之前的结论,在设置WaterMark方法中,先调用extractTimestamp方法,再调用getCurrentWatermark方法

数据乱序

上面的实例,Event Time是有序,现在来做一下数据乱序的场景模拟

启动程序,在监听终端中输入如下数据:

其中,在触发了了第一个窗口计算后,又来了两条迟到数据hello,1553503187000,hello,1553503186000

hello,1553503185000
hello,1553503186000
hello,1553503187000
hello,1553503188000
hello,1553503189000
hello,1553503190000
hello,1553503187000
hello,1553503186000
hello,1553503191000
hello,1553503192000
hello,1553503193000
hello,1553503194000
hello,1553503195000

Flink结果:

Key:hello,EventTime:1553503185000,前一条数据的水位线:0
当前水位线:1553503185000

Key:hello,EventTime:1553503186000,前一条数据的水位线:1553503185000
当前水位线:1553503186000

Key:hello,EventTime:1553503187000,前一条数据的水位线:1553503186000
当前水位线:1553503187000

Key:hello,EventTime:1553503188000,前一条数据的水位线:1553503187000
当前水位线:1553503188000

Key:hello,EventTime:1553503189000,前一条数据的水位线:1553503188000
当前水位线:1553503189000

Key:hello,EventTime:1553503190000,前一条数据的水位线:1553503189000
当前水位线:1553503190000
2> Start: - 1553503185000 - 1553503186000 - 1553503187000 - 1553503188000 - 1553503189000
当前水位线:1553503190000

Key:hello,EventTime:1553503187000,前一条数据的水位线:1553503190000
当前水位线:1553503190000

Key:hello,EventTime:1553503186000,前一条数据的水位线:1553503190000
当前水位线:1553503190000

Key:hello,EventTime:1553503191000,前一条数据的水位线:1553503190000
当前水位线:1553503191000

Key:hello,EventTime:1553503192000,前一条数据的水位线:1553503191000
当前水位线:1553503192000

Key:hello,EventTime:1553503193000,前一条数据的水位线:1553503192000
当前水位线:1553503193000

Key:hello,EventTime:1553503194000,前一条数据的水位线:1553503193000
当前水位线:1553503194000

Key:hello,EventTime:1553503195000,前一条数据的水位线:1553503194000
当前水位线:1553503195000
2> Start: - 1553503190000 - 1553503191000 - 1553503192000 - 1553503193000 - 1553503194000

从结果中可以看到,在第二个窗口中,那两条迟到数据并没有进行处理

乱序时间的设置

为了解决上面的问题,我们允许Flink处理延迟以5秒内的迟到数据

修改最大乱序时间

long maxDelayAllowed = 5000l;

在监听终端中,输入数据

hello,1553503185000
hello,1553503186000
hello,1553503187000
hello,1553503188000
hello,1553503189000
hello,1553503190000
hello,1553503187000
hello,1553503186000
hello,1553503191000
hello,1553503192000
hello,1553503193000
hello,1553503194000
hello,1553503195000

Flink输出结果:

Key:hello,EventTime:1553503185000,前一条数据的水位线:-5000
当前水位线:1553503180000

Key:hello,EventTime:1553503186000,前一条数据的水位线:1553503180000
当前水位线:1553503181000

Key:hello,EventTime:1553503187000,前一条数据的水位线:1553503181000
当前水位线:1553503182000

Key:hello,EventTime:1553503188000,前一条数据的水位线:1553503182000
当前水位线:1553503183000

Key:hello,EventTime:1553503189000,前一条数据的水位线:1553503183000
当前水位线:1553503184000

Key:hello,EventTime:1553503190000,前一条数据的水位线:1553503184000
当前水位线:1553503185000

Key:hello,EventTime:1553503187000,前一条数据的水位线:1553503185000
当前水位线:1553503185000

Key:hello,EventTime:1553503186000,前一条数据的水位线:1553503185000
当前水位线:1553503185000

Key:hello,EventTime:1553503191000,前一条数据的水位线:1553503185000
当前水位线:1553503186000

Key:hello,EventTime:1553503192000,前一条数据的水位线:1553503186000
当前水位线:1553503187000

Key:hello,EventTime:1553503193000,前一条数据的水位线:1553503187000
当前水位线:1553503188000

Key:hello,EventTime:1553503194000,前一条数据的水位线:1553503188000
当前水位线:1553503189000

Key:hello,EventTime:1553503195000,前一条数据的水位线:1553503189000
当前水位线:1553503190000
2> Start: - 1553503185000 - 1553503186000 - 1553503187000 - 1553503188000 - 1553503189000 - 1553503187000 - 1553503186000

可以看到,设置了最大允许乱序时间后,WaterMark要比原来低5秒,可以对延迟5秒内的数据进行处理,窗口的触发条件也同样会往后延迟

关于延迟时间,请结合业务场景进行设置

至此,WaterMark实例就写完了,我们下篇文章见

亲,看完了点个赞呀!

赫墨拉

我是一个喜爱大数据的小菜鸡,这里是我分享我的成长和经历的博客

You may also like...

2 Responses

  1. younh说道:

    请问你socket的数据是一条一条输入的吗? 我批量数据的时候,水位线无法每个都定位到

发表评论

电子邮件地址不会被公开。