1 伪代码
2 单流场景下的TimeWindow滚动窗口边界与数据延迟问题
3 多流Join场景下的窗口计算触发时机、延时数据丢失问题
4 针对flink流算子中rpc调用场景,利用netty自研rpc工具
Flink stream join的形式为Windows join
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
多流Join的思路是在同一窗口对多流进行Join,针对每条单流:
每条流都是使用Flink的timeWindow api中的window size、delay、timestamp,计算触发窗口计算的时机,
每条流的延时数据,Flink根据window size、delay、延时数据的timestamp,判断是否丢弃,
本节通过调节windows size、delay,分析触发窗口计算的条件,以及触发延时数据丢失的条件。
Flink源码中,数据所属窗口的计算逻辑:
//Flink源码的窗口计算函数,该函数根据每条数据的timestamp、window size计算该条数据所属的[窗口开始时间,窗口结束时间]
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
测试:根据event time和窗口时间大小,计算数据所属的窗口的开始时间和结束时间
代码位置:https://github.com/zengxiaosen/flinkMultiStreamOptimization/blob/master/src/main/java/com/z/flinkStreamOptimizatiion/stream/TimeWindowDemo.java
文件中的test1()
//结果展示:
1000000050000 -> 2001-09-09 09:47:30.000 所属窗口的开始时间是:1000000050000 -> 2001-09-09 09:47:30.000
1000000054000 -> 2001-09-09 09:47:34.000 所属窗口的起始时间是: 1000000050000 -> 2001-09-09 09:47:30.000
1000000079900 -> 2001-09-09 09:47:59.900 所属窗口的起始时间是: 1000000070000 -> 2001-09-09 09:47:50.000
1000000120000 -> 2001-09-09 09:48:40.000 所属窗口的起始时间是: 1000000120000 -> 2001-09-09 09:48:40.000
1000000111000 -> 2001-09-09 09:48:31.000 所属窗口的起始时间是: 1000000110000 -> 2001-09-09 09:48:30.000
1000000089000 -> 2001-09-09 09:48:09.000 所属窗口的起始时间是: 1000000080000 -> 2001-09-09 09:48:00.000
代码位置:https://github.com/zengxiaosen/flinkMultiStreamOptimization/blob/master/src/main/java/com/z/flinkStreamOptimizatiion/stream/TimeWindowDemo.java
文件中的test2()
数据源:
Tuple3[] elements = new Tuple3[]{
Tuple3.of("a", "1", 1000000050000L),
Tuple3.of("a", "2", 1000000054000L),
Tuple3.of("a", "3", 1000000079900L),
Tuple3.of("a", "4", 1000000120000L),
Tuple3.of("b", "5", 1000000111000L),
Tuple3.of("b", "6", 1000000089000L)
};
窗口属性设置:
long delay = 5000L;
int windowSize = 10;
水位线计算逻辑:
//水位线的目标是使水位线以下的record触发窗口计算
private final long maxOutOfOrderness = delay;
private long currentMaxTimestamp = 0L;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
针对流的每条record,跟踪水位线,窗口开始时间,窗口结束时间,时间戳等日志
#### 第 1 个record ####
currentMaxTimestamp: 1000000050000
水位线(watermark): 1000000045000 -> 2001-09-09 09:47:25.000
窗口开始时间:1000000050000 -> 2001-09-09 09:47:30.000
窗口结束时间:1000000060000 -> 2001-09-09 09:47:40.000
1 -> 1000000050000 -> 2001-09-09 09:47:30.000
#### 第 2 个record ####
currentMaxTimestamp: 1000000054000
水位线(watermark): 1000000049000 -> 2001-09-09 09:47:29.000
窗口开始时间:1000000050000 -> 2001-09-09 09:47:30.000
窗口结束时间:1000000060000 -> 2001-09-09 09:47:40.000
2 -> 1000000054000 -> 2001-09-09 09:47:34.000
#### 第 3 个record ####
currentMaxTimestamp: 1000000079900
水位线(watermark): 1000000074900 -> 2001-09-09 09:47:54.900
窗口开始时间:1000000070000 -> 2001-09-09 09:47:50.000
窗口结束时间:1000000080000 -> 2001-09-09 09:48:00.000
3 -> 1000000079900 -> 2001-09-09 09:47:59.900
(a,[1,2],1)
#### 第 4 个record ####
currentMaxTimestamp: 1000000120000
水位线(watermark): 1000000115000 -> 2001-09-09 09:48:35.000
窗口开始时间:1000000120000 -> 2001-09-09 09:48:40.000
窗口结束时间:1000000130000 -> 2001-09-09 09:48:50.000
4 -> 1000000120000 -> 2001-09-09 09:48:40.000
(a,3,1000000079900)
#### 第 5 个record ####
currentMaxTimestamp: 1000000120000
水位线(watermark): 1000000115000 -> 2001-09-09 09:48:35.000
窗口开始时间:1000000110000 -> 2001-09-09 09:48:30.000
窗口结束时间:1000000120000 -> 2001-09-09 09:48:40.000
5 -> 1000000111000 -> 2001-09-09 09:48:31.000
#### 第 6 个record ####
currentMaxTimestamp: 1000000120000
水位线(watermark): 1000000115000 -> 2001-09-09 09:48:35.000
窗口开始时间:1000000080000 -> 2001-09-09 09:48:00.000
窗口结束时间:1000000090000 -> 2001-09-09 09:48:10.000
6 -> 1000000089000 -> 2001-09-09 09:48:09.000
(b,5,1000000111000)
(a,4,1000000120000)
日志分析:
/**
* 触发窗口运算时机:
* 当一条数据过来,
* 1)水位线 > 上一批次的记录的窗口结束时间,之前的数据要进行窗口运算
* 2)水位线 > 上一批次的记录的timestamp,之前的数据要进行窗口计算
*
* 关于是否丢数据:
* 1)如果当前数据的EventTime在WaterMark之上,也就是EventTime > WaterMark。由于数据所属窗口
* 的WindowEndTime,一定是大于EventTime的。这时有WindowEndTime > EventTime > WaterMark
* 这种情况是一定不会丢数据的。
* 2)如果当前数据的EventTime在WaterMark之下,也就是WaterMark > EventTime,这时要分两种情况:
* 2.1)如果该数据所属窗口的WindowEndTime > WaterMark,表示窗口还没被触发,例如第5个record的情况,
* 即WindowEndTime > WaterMark > EventTime,这种情况数据也是不会丢失的。
* 2.2)如果该数据所属窗口的WaterMark > WindowEndTime, 则表示窗口已经无法被触发,
* 即WaterMark > WindowEndTime > EventTime, 这种情况数据也就丢失了。
*
* 特殊record:
* 第5条record,元素在水位以下,但windows还没被触发计算,参照record 5
* 第6条record,由于watermark > windows end time ,第6条数据所属的窗口就永远不会被触发计算了。
*/
窗口延时数据丢失情况:元素在水位以下,但windows已经无法被触发计算了
代码位置:https://github.com/zengxiaosen/flinkMultiStreamOptimization/blob/master/src/main/java/com/z/flinkStreamOptimizatiion/stream/TimeWindowDemo.java
文件中的test3()
数据源:
Tuple3[] elements = new Tuple3[]{
Tuple3.of("a", "1", 1000000050000L),
Tuple3.of("a", "2", 1000000054000L),
Tuple3.of("a", "3", 1000000079900L),
Tuple3.of("a", "4", 1000000120000L),
Tuple3.of("b", "5", 1000000100001L),
Tuple3.of("b", "6", 1000000109000L)
};
窗口属性设置:
long delay = 5000L;
int windowSize = 10;
针对流的每条record,跟踪水位线,窗口开始时间,窗口结束时间,时间戳等日志:
#### 第 1 个record ####
currentMaxTimestamp: 1000000050000
水位线(watermark): 1000000045000 -> 2001-09-09 09:47:25.000
窗口开始时间:1000000050000 -> 2001-09-09 09:47:30.000
窗口结束时间:1000000060000 -> 2001-09-09 09:47:40.000
1 -> 1000000050000 -> 2001-09-09 09:47:30.000
#### 第 2 个record ####
currentMaxTimestamp: 1000000054000
水位线(watermark): 1000000049000 -> 2001-09-09 09:47:29.000
窗口开始时间:1000000050000 -> 2001-09-09 09:47:30.000
窗口结束时间:1000000060000 -> 2001-09-09 09:47:40.000
2 -> 1000000054000 -> 2001-09-09 09:47:34.000
#### 第 3 个record ####
currentMaxTimestamp: 1000000079900
水位线(watermark): 1000000074900 -> 2001-09-09 09:47:54.900
窗口开始时间:1000000070000 -> 2001-09-09 09:47:50.000
窗口结束时间:1000000080000 -> 2001-09-09 09:48:00.000
3 -> 1000000079900 -> 2001-09-09 09:47:59.900
(a,[1,2],1)
#### 第 4 个record ####
currentMaxTimestamp: 1000000120000
水位线(watermark): 1000000115000 -> 2001-09-09 09:48:35.000
窗口开始时间:1000000120000 -> 2001-09-09 09:48:40.000
窗口结束时间:1000000130000 -> 2001-09-09 09:48:50.000
4 -> 1000000120000 -> 2001-09-09 09:48:40.000
(a,3,1000000079900)
#### 第 5 个record ####
currentMaxTimestamp: 1000000120000
水位线(watermark): 1000000115000 -> 2001-09-09 09:48:35.000
窗口开始时间:1000000100000 -> 2001-09-09 09:48:20.000
窗口结束时间:1000000110000 -> 2001-09-09 09:48:30.000
5 -> 1000000100001 -> 2001-09-09 09:48:20.001
#### 第 6 个record ####
currentMaxTimestamp: 1000000120000
水位线(watermark): 1000000115000 -> 2001-09-09 09:48:35.000
窗口开始时间:1000000100000 -> 2001-09-09 09:48:20.000
窗口结束时间:1000000110000 -> 2001-09-09 09:48:30.000
6 -> 1000000109000 -> 2001-09-09 09:48:29.000
(a,4,1000000120000)
日志分析:
/**
* 观察record 5 和 record 6,它们的窗口属性如下:
* 窗口开始时间:1000000100000 -> 2001-09-09 09:48:20.000
* 窗口结束时间:1000000110000 -> 2001-09-09 09:48:30.000
* windows end time < watermark, 这个窗口已经无法被触发计算了。
* 也就是说,这个窗口创建时,已经 windows end time < watermark,相当于第5第6条记录都丢失了。
*/
解决思路:通过增大delay,来增大失序的容忍程度,确保不丢数据
代码位置:https://github.com/zengxiaosen/flinkMultiStreamOptimization/blob/master/src/main/java/com/z/flinkStreamOptimizatiion/stream/TimeWindowDemo.java
文件中的test4()
数据源:
Tuple3[] elements = new Tuple3[]{
Tuple3.of("a", "1", 1000000050000L),
Tuple3.of("a", "2", 1000000054000L),
Tuple3.of("a", "3", 1000000079900L),
Tuple3.of("a", "4", 1000000115000L),
Tuple3.of("b", "5", 1000000100000L),
Tuple3.of("b", "6", 1000000108000L)
};
窗口属性设置:
long delay = 5100L;
int windowSize = 10;
针对流的每条record,跟踪水位线,窗口开始时间,窗口结束时间,时间戳等日志:
#### 第 1 个record ####
currentMaxTimestamp: 1000000050000
水位线(watermark): 1000000044900 -> 2001-09-09 09:47:24.900
窗口开始时间:1000000050000 -> 2001-09-09 09:47:30.000
窗口结束时间:1000000060000 -> 2001-09-09 09:47:40.000
1 -> 1000000050000 -> 2001-09-09 09:47:30.000
#### 第 2 个record ####
currentMaxTimestamp: 1000000054000
水位线(watermark): 1000000048900 -> 2001-09-09 09:47:28.900
窗口开始时间:1000000050000 -> 2001-09-09 09:47:30.000
窗口结束时间:1000000060000 -> 2001-09-09 09:47:40.000
2 -> 1000000054000 -> 2001-09-09 09:47:34.000
#### 第 3 个record ####
currentMaxTimestamp: 1000000079900
水位线(watermark): 1000000074800 -> 2001-09-09 09:47:54.800
窗口开始时间:1000000070000 -> 2001-09-09 09:47:50.000
窗口结束时间:1000000080000 -> 2001-09-09 09:48:00.000
3 -> 1000000079900 -> 2001-09-09 09:47:59.900
(a,[1,2],1)
#### 第 4 个record ####
currentMaxTimestamp: 1000000115000
水位线(watermark): 1000000109900 -> 2001-09-09 09:48:29.900
窗口开始时间:1000000110000 -> 2001-09-09 09:48:30.000
窗口结束时间:1000000120000 -> 2001-09-09 09:48:40.000
4 -> 1000000115000 -> 2001-09-09 09:48:35.000
(a,3,1000000079900)
#### 第 5 个record ####
currentMaxTimestamp: 1000000115000
水位线(watermark): 1000000109900 -> 2001-09-09 09:48:29.900
窗口开始时间:1000000100000 -> 2001-09-09 09:48:20.000
窗口结束时间:1000000110000 -> 2001-09-09 09:48:30.000
5 -> 1000000100000 -> 2001-09-09 09:48:20.000
#### 第 6 个record ####
currentMaxTimestamp: 1000000115000
水位线(watermark): 1000000109900 -> 2001-09-09 09:48:29.900
窗口开始时间:1000000100000 -> 2001-09-09 09:48:20.000
窗口结束时间:1000000110000 -> 2001-09-09 09:48:30.000
6 -> 1000000108000 -> 2001-09-09 09:48:28.000
(b,[5,6],1)
(a,4,1000000115000)
日志分析:
/**
* 观察 record 5 和 record 6, 它们的时间窗口如下:
* 窗口开始时间:1000000100000 -> 2001-09-09 09:48:20.000
* 窗口结束时间:1000000110000 -> 2001-09-09 09:48:30.000
* 它们进来的时候水位线如下:
* 水位线(watermark): 1000000109900 -> 2001-09-09 09:48:29.900
* 也就是说,它们进来的时候,watermark < windows end time
* 这种情况下,就算数据的 eventtime < watermark,数据还是被保留下来,没有丢失。
*/
//StreamJoinDataSource1:
Tuple3[] elements = new Tuple3[]{
Tuple3.of("a", "1", 1000000050000L),
Tuple3.of("a", "2", 1000000054000L),
Tuple3.of("a", "3", 1000000079900L),
Tuple3.of("a", "4", 1000000115000L),
Tuple3.of("b", "5", 1000000100000L),
Tuple3.of("b", "6", 1000000108000L)
};
//StreamJoinDataSource2:
Tuple3[] elements = new Tuple3[]{
Tuple3.of("a", "hangzhou", 1000000059000L),
Tuple3.of("b", "beijing", 1000000105000L),
};
窗口属性设置:
//毫秒为单位
int windowSize = 10;
long delay = 5100L;
针对流的每条record,跟踪水位线,窗口开始时间,窗口结束时间,时间戳等日志:
####################################
element.f1: hangzhou
水位线(watermark): 1000000053900 -> 2001-09-09 09:47:33.900
窗口开始时间:1000000050000 -> 2001-09-09 09:47:30.000
窗口结束时间:1000000060000 -> 2001-09-09 09:47:40.000
hangzhou -> 1000000059000 -> 2001-09-09 09:47:39.000
####################################
element.f1: 1
水位线(watermark): 1000000044900 -> 2001-09-09 09:47:24.900
窗口开始时间:1000000050000 -> 2001-09-09 09:47:30.000
窗口结束时间:1000000060000 -> 2001-09-09 09:47:40.000
1 -> 1000000050000 -> 2001-09-09 09:47:30.000
####################################
element.f1: 2
水位线(watermark): 1000000048900 -> 2001-09-09 09:47:28.900
窗口开始时间:1000000050000 -> 2001-09-09 09:47:30.000
窗口结束时间:1000000060000 -> 2001-09-09 09:47:40.000
2 -> 1000000054000 -> 2001-09-09 09:47:34.000
####################################
element.f1: beijing
水位线(watermark): 1000000099900 -> 2001-09-09 09:48:19.900
窗口开始时间:1000000100000 -> 2001-09-09 09:48:20.000
窗口结束时间:1000000110000 -> 2001-09-09 09:48:30.000
beijing -> 1000000105000 -> 2001-09-09 09:48:25.000
####################################
element.f1: 3
水位线(watermark): 1000000074800 -> 2001-09-09 09:47:54.800
窗口开始时间:1000000070000 -> 2001-09-09 09:47:50.000
窗口结束时间:1000000080000 -> 2001-09-09 09:48:00.000
3 -> 1000000079900 -> 2001-09-09 09:47:59.900
触发双流join窗口运算
(a,1,hangzhou,1000000050000,1000000059000)
触发双流join窗口运算
(a,2,hangzhou,1000000054000,1000000059000)
####################################
element.f1: 4
水位线(watermark): 1000000109900 -> 2001-09-09 09:48:29.900
窗口开始时间:1000000110000 -> 2001-09-09 09:48:30.000
窗口结束时间:1000000120000 -> 2001-09-09 09:48:40.000
4 -> 1000000115000 -> 2001-09-09 09:48:35.000
####################################
element.f1: 5
水位线(watermark): 1000000109900 -> 2001-09-09 09:48:29.900
窗口开始时间:1000000100000 -> 2001-09-09 09:48:20.000
窗口结束时间:1000000110000 -> 2001-09-09 09:48:30.000
5 -> 1000000100000 -> 2001-09-09 09:48:20.000
####################################
element.f1: 6
水位线(watermark): 1000000109900 -> 2001-09-09 09:48:29.900
窗口开始时间:1000000100000 -> 2001-09-09 09:48:20.000
窗口结束时间:1000000110000 -> 2001-09-09 09:48:30.000
6 -> 1000000108000 -> 2001-09-09 09:48:28.000
触发双流join窗口运算
(b,5,beijing,1000000100000,1000000105000)
触发双流join窗口运算
(b,6,beijing,1000000108000,1000000105000)
日志分析:
结论1:如果source1触发窗口计算的时候,source2还没有触发窗口计算,也就是说,source2在窗口中没有数据,
需要等待source2触发窗口计算,把数据放置到窗口中,才能进行基于多流的join操作。
结论2:假设缩小delay,也就是提升水位线,有可能导致watermark > window end time,导致丢数据,例子:
/**
* 当设置参数int windowSize = 10; long delay = 5000L;时
* 输出为:
* (a,1,hangzhou,1000000050000,1000000059000)
* (a,2,hangzhou,1000000054000,1000000059000)
* 原因:
* window_end_time < watermark, 导致数据丢失了。
*/
server端启动:
https://github.com/zengxiaosen/flinkMultiStreamOptimization/blob/master/src/main/java/com/z/flinkStreamOptimizatiion/rpc/demo/DemoServer.java
flink的算子flatmap中初始化client端,调用rpc服务:
https://github.com/zengxiaosen/flinkMultiStreamOptimization/blob/master/src/main/java/com/z/flinkStreamOptimizatiion/stream/TimeWindowDemo.java
中的test6()
调用流程:
1)一般flink算子中调用rpc都是在每个task上去建立连接,调用,销毁连接。
2)在flatmap之上统一new rpc client不可行,因为此rpc使用netty实现,而netty的bootstrap是final类型,也不能序列化。
3)所以综上所述,需要在每个task上去调用rpc服务。