Skip to content

优化flink的多流操作(例如join),优化点不限于数据丢失问题,以及性能问题

License

Notifications You must be signed in to change notification settings

zengxiaosen/flinkMultiStreamOptimization

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

14 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

基于Flink多流Join优化的研究与实现

1 伪代码
2 单流场景下的TimeWindow滚动窗口边界与数据延迟问题
3 多流Join场景下的窗口计算触发时机、延时数据丢失问题
4 针对flink流算子中rpc调用场景,利用netty自研rpc工具

1 伪代码:

Flink stream join的形式为Windows join

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

2 单流场景下的TimeWindow滚动窗口边界与数据延迟问题

2.1 问题陈述

多流Join的思路是在同一窗口对多流进行Join,针对每条单流:
每条流都是使用Flink的timeWindow api中的window size、delay、timestamp,计算触发窗口计算的时机,
每条流的延时数据,Flink根据window size、delay、延时数据的timestamp,判断是否丢弃,
本节通过调节windows size、delay,分析触发窗口计算的条件,以及触发延时数据丢失的条件。

2.2 数据所属窗口计算逻辑

Flink源码中,数据所属窗口的计算逻辑:

//Flink源码的窗口计算函数,该函数根据每条数据的timestamp、window size计算该条数据所属的[窗口开始时间,窗口结束时间]
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
    return timestamp - (timestamp - offset + windowSize) % windowSize;
}

测试:根据event time和窗口时间大小,计算数据所属的窗口的开始时间和结束时间
代码位置:https://github.com/zengxiaosen/flinkMultiStreamOptimization/blob/master/src/main/java/com/z/flinkStreamOptimizatiion/stream/TimeWindowDemo.java
文件中的test1()

//结果展示:
1000000050000 -> 2001-09-09 09:47:30.000	所属窗口的开始时间是:1000000050000 -> 2001-09-09 09:47:30.000
1000000054000 -> 2001-09-09 09:47:34.000	所属窗口的起始时间是: 1000000050000 -> 2001-09-09 09:47:30.000
1000000079900 -> 2001-09-09 09:47:59.900	所属窗口的起始时间是: 1000000070000 -> 2001-09-09 09:47:50.000
1000000120000 -> 2001-09-09 09:48:40.000	所属窗口的起始时间是: 1000000120000 -> 2001-09-09 09:48:40.000
1000000111000 -> 2001-09-09 09:48:31.000	所属窗口的起始时间是: 1000000110000 -> 2001-09-09 09:48:30.000
1000000089000 -> 2001-09-09 09:48:09.000	所属窗口的起始时间是: 1000000080000 -> 2001-09-09 09:48:00.000

2.3 单流的窗口计算触发时机

代码位置:https://github.com/zengxiaosen/flinkMultiStreamOptimization/blob/master/src/main/java/com/z/flinkStreamOptimizatiion/stream/TimeWindowDemo.java
文件中的test2()
数据源:

Tuple3[] elements = new Tuple3[]{
                    Tuple3.of("a", "1", 1000000050000L),
                    Tuple3.of("a", "2", 1000000054000L),
                    Tuple3.of("a", "3", 1000000079900L),
                    Tuple3.of("a", "4", 1000000120000L),
                    Tuple3.of("b", "5", 1000000111000L),
                    Tuple3.of("b", "6", 1000000089000L)
            };

窗口属性设置:

long delay = 5000L;
int windowSize = 10;

水位线计算逻辑:

//水位线的目标是使水位线以下的record触发窗口计算
private final long maxOutOfOrderness = delay;
private long currentMaxTimestamp = 0L;

@Nullable
@Override
public Watermark getCurrentWatermark() {
    return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}

针对流的每条record,跟踪水位线,窗口开始时间,窗口结束时间,时间戳等日志

#### 第 1 个record ####
currentMaxTimestamp: 1000000050000
水位线(watermark): 1000000045000 -> 2001-09-09 09:47:25.000
窗口开始时间:1000000050000 -> 2001-09-09 09:47:30.000
窗口结束时间:1000000060000 -> 2001-09-09 09:47:40.000
1 -> 1000000050000 -> 2001-09-09 09:47:30.000
#### 第 2 个record ####
currentMaxTimestamp: 1000000054000
水位线(watermark): 1000000049000 -> 2001-09-09 09:47:29.000
窗口开始时间:1000000050000 -> 2001-09-09 09:47:30.000
窗口结束时间:1000000060000 -> 2001-09-09 09:47:40.000
2 -> 1000000054000 -> 2001-09-09 09:47:34.000
#### 第 3 个record ####
currentMaxTimestamp: 1000000079900
水位线(watermark): 1000000074900 -> 2001-09-09 09:47:54.900
窗口开始时间:1000000070000 -> 2001-09-09 09:47:50.000
窗口结束时间:1000000080000 -> 2001-09-09 09:48:00.000
3 -> 1000000079900 -> 2001-09-09 09:47:59.900
(a,[1,2],1)
#### 第 4 个record ####
currentMaxTimestamp: 1000000120000
水位线(watermark): 1000000115000 -> 2001-09-09 09:48:35.000
窗口开始时间:1000000120000 -> 2001-09-09 09:48:40.000
窗口结束时间:1000000130000 -> 2001-09-09 09:48:50.000
4 -> 1000000120000 -> 2001-09-09 09:48:40.000
(a,3,1000000079900)
#### 第 5 个record ####
currentMaxTimestamp: 1000000120000
水位线(watermark): 1000000115000 -> 2001-09-09 09:48:35.000
窗口开始时间:1000000110000 -> 2001-09-09 09:48:30.000
窗口结束时间:1000000120000 -> 2001-09-09 09:48:40.000
5 -> 1000000111000 -> 2001-09-09 09:48:31.000
#### 第 6 个record ####
currentMaxTimestamp: 1000000120000
水位线(watermark): 1000000115000 -> 2001-09-09 09:48:35.000
窗口开始时间:1000000080000 -> 2001-09-09 09:48:00.000
窗口结束时间:1000000090000 -> 2001-09-09 09:48:10.000
6 -> 1000000089000 -> 2001-09-09 09:48:09.000
(b,5,1000000111000)
(a,4,1000000120000)

日志分析:

/**
                     * 触发窗口运算时机:
                     * 当一条数据过来,
                     * 1)水位线 > 上一批次的记录的窗口结束时间,之前的数据要进行窗口运算
                     * 2)水位线 > 上一批次的记录的timestamp,之前的数据要进行窗口计算
                     *
                     * 关于是否丢数据:
                     * 1)如果当前数据的EventTime在WaterMark之上,也就是EventTime > WaterMark。由于数据所属窗口
                     * 的WindowEndTime,一定是大于EventTime的。这时有WindowEndTime > EventTime > WaterMark
                     * 这种情况是一定不会丢数据的。
                     * 2)如果当前数据的EventTime在WaterMark之下,也就是WaterMark > EventTime,这时要分两种情况:
                     *  2.1)如果该数据所属窗口的WindowEndTime > WaterMark,表示窗口还没被触发,例如第5个record的情况,
                     *  即WindowEndTime > WaterMark > EventTime,这种情况数据也是不会丢失的。
                     *  2.2)如果该数据所属窗口的WaterMark > WindowEndTime, 则表示窗口已经无法被触发,
                     *  即WaterMark > WindowEndTime > EventTime, 这种情况数据也就丢失了。
                     *
                     * 特殊record:
                     * 第5条record,元素在水位以下,但windows还没被触发计算,参照record 5
                     * 第6条record,由于watermark > windows end time ,第6条数据所属的窗口就永远不会被触发计算了。
                     */

2.4 单流的窗口计算数据丢失场景

窗口延时数据丢失情况:元素在水位以下,但windows已经无法被触发计算了
代码位置:https://github.com/zengxiaosen/flinkMultiStreamOptimization/blob/master/src/main/java/com/z/flinkStreamOptimizatiion/stream/TimeWindowDemo.java
文件中的test3()
数据源:

Tuple3[] elements = new Tuple3[]{
                    Tuple3.of("a", "1", 1000000050000L),
                    Tuple3.of("a", "2", 1000000054000L),
                    Tuple3.of("a", "3", 1000000079900L),
                    Tuple3.of("a", "4", 1000000120000L),
                    Tuple3.of("b", "5", 1000000100001L),
                    Tuple3.of("b", "6", 1000000109000L)
            };

窗口属性设置:

long delay = 5000L;
int windowSize = 10;

针对流的每条record,跟踪水位线,窗口开始时间,窗口结束时间,时间戳等日志:

#### 第 1 个record ####
currentMaxTimestamp: 1000000050000
水位线(watermark): 1000000045000 -> 2001-09-09 09:47:25.000
窗口开始时间:1000000050000 -> 2001-09-09 09:47:30.000
窗口结束时间:1000000060000 -> 2001-09-09 09:47:40.000
1 -> 1000000050000 -> 2001-09-09 09:47:30.000
#### 第 2 个record ####
currentMaxTimestamp: 1000000054000
水位线(watermark): 1000000049000 -> 2001-09-09 09:47:29.000
窗口开始时间:1000000050000 -> 2001-09-09 09:47:30.000
窗口结束时间:1000000060000 -> 2001-09-09 09:47:40.000
2 -> 1000000054000 -> 2001-09-09 09:47:34.000
#### 第 3 个record ####
currentMaxTimestamp: 1000000079900
水位线(watermark): 1000000074900 -> 2001-09-09 09:47:54.900
窗口开始时间:1000000070000 -> 2001-09-09 09:47:50.000
窗口结束时间:1000000080000 -> 2001-09-09 09:48:00.000
3 -> 1000000079900 -> 2001-09-09 09:47:59.900
(a,[1,2],1)
#### 第 4 个record ####
currentMaxTimestamp: 1000000120000
水位线(watermark): 1000000115000 -> 2001-09-09 09:48:35.000
窗口开始时间:1000000120000 -> 2001-09-09 09:48:40.000
窗口结束时间:1000000130000 -> 2001-09-09 09:48:50.000
4 -> 1000000120000 -> 2001-09-09 09:48:40.000
(a,3,1000000079900)
#### 第 5 个record ####
currentMaxTimestamp: 1000000120000
水位线(watermark): 1000000115000 -> 2001-09-09 09:48:35.000
窗口开始时间:1000000100000 -> 2001-09-09 09:48:20.000
窗口结束时间:1000000110000 -> 2001-09-09 09:48:30.000
5 -> 1000000100001 -> 2001-09-09 09:48:20.001
#### 第 6 个record ####
currentMaxTimestamp: 1000000120000
水位线(watermark): 1000000115000 -> 2001-09-09 09:48:35.000
窗口开始时间:1000000100000 -> 2001-09-09 09:48:20.000
窗口结束时间:1000000110000 -> 2001-09-09 09:48:30.000
6 -> 1000000109000 -> 2001-09-09 09:48:29.000
(a,4,1000000120000)

日志分析:

/**
     * 观察record 5 和 record 6,它们的窗口属性如下:
     * 窗口开始时间:1000000100000 -> 2001-09-09 09:48:20.000
     * 窗口结束时间:1000000110000 -> 2001-09-09 09:48:30.000
     * windows end time < watermark, 这个窗口已经无法被触发计算了。
     * 也就是说,这个窗口创建时,已经 windows end time < watermark,相当于第5第6条记录都丢失了。
     */

2.5 针对单流延时数据的丢失问题,提出增大delay的解决方案

解决思路:通过增大delay,来增大失序的容忍程度,确保不丢数据
代码位置:https://github.com/zengxiaosen/flinkMultiStreamOptimization/blob/master/src/main/java/com/z/flinkStreamOptimizatiion/stream/TimeWindowDemo.java
文件中的test4()
数据源:

Tuple3[] elements = new Tuple3[]{
                    Tuple3.of("a", "1", 1000000050000L),
                    Tuple3.of("a", "2", 1000000054000L),
                    Tuple3.of("a", "3", 1000000079900L),
                    Tuple3.of("a", "4", 1000000115000L),
                    Tuple3.of("b", "5", 1000000100000L),
                    Tuple3.of("b", "6", 1000000108000L)
            };

窗口属性设置:

long delay = 5100L;
int windowSize = 10;

针对流的每条record,跟踪水位线,窗口开始时间,窗口结束时间,时间戳等日志:

#### 第 1 个record ####
currentMaxTimestamp: 1000000050000
水位线(watermark): 1000000044900 -> 2001-09-09 09:47:24.900
窗口开始时间:1000000050000 -> 2001-09-09 09:47:30.000
窗口结束时间:1000000060000 -> 2001-09-09 09:47:40.000
1 -> 1000000050000 -> 2001-09-09 09:47:30.000
#### 第 2 个record ####
currentMaxTimestamp: 1000000054000
水位线(watermark): 1000000048900 -> 2001-09-09 09:47:28.900
窗口开始时间:1000000050000 -> 2001-09-09 09:47:30.000
窗口结束时间:1000000060000 -> 2001-09-09 09:47:40.000
2 -> 1000000054000 -> 2001-09-09 09:47:34.000
#### 第 3 个record ####
currentMaxTimestamp: 1000000079900
水位线(watermark): 1000000074800 -> 2001-09-09 09:47:54.800
窗口开始时间:1000000070000 -> 2001-09-09 09:47:50.000
窗口结束时间:1000000080000 -> 2001-09-09 09:48:00.000
3 -> 1000000079900 -> 2001-09-09 09:47:59.900
(a,[1,2],1)
#### 第 4 个record ####
currentMaxTimestamp: 1000000115000
水位线(watermark): 1000000109900 -> 2001-09-09 09:48:29.900
窗口开始时间:1000000110000 -> 2001-09-09 09:48:30.000
窗口结束时间:1000000120000 -> 2001-09-09 09:48:40.000
4 -> 1000000115000 -> 2001-09-09 09:48:35.000
(a,3,1000000079900)
#### 第 5 个record ####
currentMaxTimestamp: 1000000115000
水位线(watermark): 1000000109900 -> 2001-09-09 09:48:29.900
窗口开始时间:1000000100000 -> 2001-09-09 09:48:20.000
窗口结束时间:1000000110000 -> 2001-09-09 09:48:30.000
5 -> 1000000100000 -> 2001-09-09 09:48:20.000
#### 第 6 个record ####
currentMaxTimestamp: 1000000115000
水位线(watermark): 1000000109900 -> 2001-09-09 09:48:29.900
窗口开始时间:1000000100000 -> 2001-09-09 09:48:20.000
窗口结束时间:1000000110000 -> 2001-09-09 09:48:30.000
6 -> 1000000108000 -> 2001-09-09 09:48:28.000
(b,[5,6],1)
(a,4,1000000115000)

日志分析:

/**
     * 观察 record 5 和 record 6, 它们的时间窗口如下:
     * 窗口开始时间:1000000100000 -> 2001-09-09 09:48:20.000
     * 窗口结束时间:1000000110000 -> 2001-09-09 09:48:30.000
     * 它们进来的时候水位线如下:
     * 水位线(watermark): 1000000109900 -> 2001-09-09 09:48:29.900
     * 也就是说,它们进来的时候,watermark < windows end time
     * 这种情况下,就算数据的 eventtime < watermark,数据还是被保留下来,没有丢失。
     */

3 多流Join场景下的窗口计算触发时机、延时数据丢失问题

代码位置:https://github.com/zengxiaosen/flinkMultiStreamOptimization/blob/master/src/main/java/com/z/flinkStreamOptimizatiion/stream/StreamJoinDemo.java
数据源:两条流

//StreamJoinDataSource1:
Tuple3[] elements = new Tuple3[]{
                Tuple3.of("a", "1", 1000000050000L),
                Tuple3.of("a", "2", 1000000054000L),
                Tuple3.of("a", "3", 1000000079900L),
                Tuple3.of("a", "4", 1000000115000L),
                Tuple3.of("b", "5", 1000000100000L),
                Tuple3.of("b", "6", 1000000108000L)
        };
//StreamJoinDataSource2:
Tuple3[] elements = new Tuple3[]{
                Tuple3.of("a", "hangzhou", 1000000059000L),
                Tuple3.of("b", "beijing", 1000000105000L),
        };

窗口属性设置:

//毫秒为单位
int windowSize = 10;
long delay = 5100L;

针对流的每条record,跟踪水位线,窗口开始时间,窗口结束时间,时间戳等日志:

####################################
element.f1: hangzhou
水位线(watermark): 1000000053900 -> 2001-09-09 09:47:33.900
窗口开始时间:1000000050000 -> 2001-09-09 09:47:30.000
窗口结束时间:1000000060000 -> 2001-09-09 09:47:40.000
hangzhou -> 1000000059000 -> 2001-09-09 09:47:39.000
####################################
element.f1: 1
水位线(watermark): 1000000044900 -> 2001-09-09 09:47:24.900
窗口开始时间:1000000050000 -> 2001-09-09 09:47:30.000
窗口结束时间:1000000060000 -> 2001-09-09 09:47:40.000
1 -> 1000000050000 -> 2001-09-09 09:47:30.000
####################################
element.f1: 2
水位线(watermark): 1000000048900 -> 2001-09-09 09:47:28.900
窗口开始时间:1000000050000 -> 2001-09-09 09:47:30.000
窗口结束时间:1000000060000 -> 2001-09-09 09:47:40.000
2 -> 1000000054000 -> 2001-09-09 09:47:34.000
####################################
element.f1: beijing
水位线(watermark): 1000000099900 -> 2001-09-09 09:48:19.900
窗口开始时间:1000000100000 -> 2001-09-09 09:48:20.000
窗口结束时间:1000000110000 -> 2001-09-09 09:48:30.000
beijing -> 1000000105000 -> 2001-09-09 09:48:25.000
####################################
element.f1: 3
水位线(watermark): 1000000074800 -> 2001-09-09 09:47:54.800
窗口开始时间:1000000070000 -> 2001-09-09 09:47:50.000
窗口结束时间:1000000080000 -> 2001-09-09 09:48:00.000
3 -> 1000000079900 -> 2001-09-09 09:47:59.900
触发双流join窗口运算
(a,1,hangzhou,1000000050000,1000000059000)
触发双流join窗口运算
(a,2,hangzhou,1000000054000,1000000059000)
####################################
element.f1: 4
水位线(watermark): 1000000109900 -> 2001-09-09 09:48:29.900
窗口开始时间:1000000110000 -> 2001-09-09 09:48:30.000
窗口结束时间:1000000120000 -> 2001-09-09 09:48:40.000
4 -> 1000000115000 -> 2001-09-09 09:48:35.000
####################################
element.f1: 5
水位线(watermark): 1000000109900 -> 2001-09-09 09:48:29.900
窗口开始时间:1000000100000 -> 2001-09-09 09:48:20.000
窗口结束时间:1000000110000 -> 2001-09-09 09:48:30.000
5 -> 1000000100000 -> 2001-09-09 09:48:20.000
####################################
element.f1: 6
水位线(watermark): 1000000109900 -> 2001-09-09 09:48:29.900
窗口开始时间:1000000100000 -> 2001-09-09 09:48:20.000
窗口结束时间:1000000110000 -> 2001-09-09 09:48:30.000
6 -> 1000000108000 -> 2001-09-09 09:48:28.000
触发双流join窗口运算
(b,5,beijing,1000000100000,1000000105000)
触发双流join窗口运算
(b,6,beijing,1000000108000,1000000105000)

日志分析:

结论1:如果source1触发窗口计算的时候,source2还没有触发窗口计算,也就是说,source2在窗口中没有数据,
需要等待source2触发窗口计算,把数据放置到窗口中,才能进行基于多流的join操作。
结论2:假设缩小delay,也就是提升水位线,有可能导致watermark > window end time,导致丢数据,例子:
/**
         * 当设置参数int windowSize = 10; long delay = 5000L;时
         * 输出为:
         * (a,1,hangzhou,1000000050000,1000000059000)
         * (a,2,hangzhou,1000000054000,1000000059000)
         * 原因:
         * window_end_time < watermark, 导致数据丢失了。
         */

4 针对flink流算子中rpc调用场景,利用netty自研rpc工具

server端启动: https://github.com/zengxiaosen/flinkMultiStreamOptimization/blob/master/src/main/java/com/z/flinkStreamOptimizatiion/rpc/demo/DemoServer.java
flink的算子flatmap中初始化client端,调用rpc服务: https://github.com/zengxiaosen/flinkMultiStreamOptimization/blob/master/src/main/java/com/z/flinkStreamOptimizatiion/stream/TimeWindowDemo.java
中的test6()

调用流程:
1)一般flink算子中调用rpc都是在每个task上去建立连接,调用,销毁连接。  
2)在flatmap之上统一new rpc client不可行,因为此rpc使用netty实现,而netty的bootstrap是final类型,也不能序列化。  
3)所以综上所述,需要在每个task上去调用rpc服务。 

About

优化flink的多流操作(例如join),优化点不限于数据丢失问题,以及性能问题

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages