Flink——Time&Window&Watermark
主要内容:
- 介绍Time、Window、Watermark的基本概念
- 介绍几个实现案例
1 Flink时间语义
时间属性是流处理中最重要的一个方面,是流处理系统的基石之一。Flink作为一个先进的分布式流处理引擎,支持不同的时间语义:
- Event Time:事件生成时间,可根据每一条处理记录所携带的时间戳来判定。Flink通过时间戳分配器获取改时间。
- Ingestion Time:事件接入事件,指数据接入Flink DataSource的时间。
- Processing Time:事件处理时间,指执行转换操作的算子所在的服务器时间。Flink默认的时间语义。
其核心是 Processing Time 和 Event Time。
代码中选择不同时间语义的方式:
// Event Time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// Ingestion Time
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// Processing Time
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
2 Window
Window 是无限数据流处理的核心。通过窗口将无限数据流切割为有限数据流,然后对窗口内的数据进行聚合操作,如计算一个小时内有多少用户点击了口网站。
按照是否对DataStream中的Key进行分组,可将其分为两类:
- Keyed Window
- Non-Keyed Window
各自的应用形式如下:(其中Window Assigner和Window Function为必选)
// Keyed Window
stream
.keyBy(...) <- 按照一个Key进行分组
.window(...) <- 将数据流中的元素分配到相应的窗口中 Assigner
[.trigger(...)] <- 指定触发器Trigger(可选)
[.evictor(...)] <- 指定清除器Evictor(可选)
.reduce/aggregate/process() <- 窗口处理函数Window Function
// Non-Keyed Window
stream
.windowAll(...) <- 不分组,将数据流中的所有元素分配到相应的窗口中
[.trigger(...)] <- 指定触发器Trigger(可选)
[.evictor(...)] <- 指定清除器Evictor(可选)
.reduce/aggregate/process() <- 窗口处理函数Window Function
其中涉及到几种数据类型的转换:
Flink支持两种类型的窗口:(其核心是TimeWindow)
- TimeWindow:基于时间生成窗口。将制定时间范围内的所有数据组成一个window,一次对一个window里面所有的数据进行计算。如每20s生成一个窗口。
- CountWindow:基于元素数量(相同Key)生成窗口,与时间无关,如每20个元素生成一个窗口。
Window Assigner
TimeWindow根据实现原理可以分为三类:
- Tumbling Window:滚动窗口,将数据依据固定的窗口长度进行切片。特点是时间对齐、窗口长度固定、没有重复。
val inputStream: DataStream[T] = ...
val tumbling: DataStream[T] = inputStream
.keyBy(0)
// 窗口长度5s
.timeWindow(Time.minutes(5))
.process(...)
- Sliding Window:滑动窗口,由固定的窗口长度和滑动间隔组成。特点是时间对齐、窗口长度固定、有重叠。
val inputStream: DataStream[T] = ...
val sliding: DataStream[T] = inputStream
.keyBy(0)
// 窗口长度5s,滑动间隔1s,每隔1s计算前5秒的数据
.timeWindow(Time.seconds(5), Time.seconds(1))
.process(...)
- Session Window:会话窗口,在规定的时间内(Session Gap)如果没有数据接入,则认为窗口结束。特点:时间无对齐。
val inputStream: DataStream[T] = ...
val session: DataStream[T] = inputStream
.keyBy(0)
// 如果10min没有数据接入就结束当前窗口
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.process(...)
timeWindow使用
v.timeWindow()
是简写:
.window(SlidingEventTimeWindows.of(size))
.window(SlidingProcessingTimeWindows.of(size))
或:
.window(TumblingEventTimeWindows.of(size))
.window(TumblingProcessingTimeWindows.of(size))
源码中timeWindow的逻辑:
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(TumblingProcessingTimeWindows.of(size));
} else {
return window(TumblingEventTimeWindows.of(size));
}
}
具体是Sliding,还是Tumbling类型窗口,根据参数个数确定:
- 一个参数:Tumbling
- 两个参数:Sliding
以上两个参数可以用
org.apache.flink.streaming.api.windowing.time.Time
中的seconds
、minutes
、hours
和days
来设置。
具体是EventTime,还是ProcessingTime时间语义,可以根据第一部分中的代码进行设置。
Window Function
对数据集定义了Window Assigner后,数据被分配到不同的窗口里,接下来通过窗口函数,在每个窗口上对窗口内的数据进行处理。窗口函数主要分为两种:
- 增量计算:指的是窗口保存一份中间数据,每流入一个新元素,新元素与中间数据两两合一,生成新的中间数据,再保存到窗口中。计算性能较高,占用存储空间较少,窗口中只维护了中间结果状态值,不需要缓存原始数据。如
RreduceFunction
、AggregateFunction
。 - 全量计算:指的是窗口先缓存该窗口所有元素,等到触发条件后对窗口内的全量元素执行计算。全量计算的代价较高,性能比较弱,因为算子需要对所有属于该窗口的接入数据进行缓存,然后等到窗口触发的时候,对所有的原始数据进行汇总计算。如
ProcessWindowFunction
。
3 Watermark
时间有一个重要的特性:只增不减
对于Processing Time:
使用转换算子所在服务器的时间,假设服务器时钟同步正常,则Processing Time就是有序的数据流。
对于Event Time:
使用绑定在record中的时间,由于网络延迟、程序内部逻辑、或者其他一些分布式系统的原因,数据的时间可能会存在一定程度的乱序。
当使用Event Time时,我们就要考虑如何处理这种乱序情况。处理乱序event的方式就是等待更早event到来,但是不能永久等待下去。我们就需要一种策略,何时停止等待。这就是watermark的作用:何时不再等待更早数据,开始计算窗口内的event。
watermark是一种衡量Event Time进展的机制。watermark是用于处理乱序事件的,而正确的处理乱序事件 ,通常用watermark机制结合window 来实现。可以理解为一个延迟触发机制。watermark是数据本身的一个隐藏属性,数据本身携带着对相应的watermark,因此,如果运行过程中无法获取新的数据,那么窗口将永远无法触发。watermark(t):时间戳t以前的event都已经到了,未来小于等于t的event不会再来,可以触发并销毁窗口了。
上图中,有序数据的watermark为0,无序数据中设置允许的最大延迟到达时间为2s,所以时间戳为6s的事件对应的watermark时4s,时间戳为10s的事件的watermark是8s。如果我们的窗口1是1s4s,窗口2是5s8s,那么时间戳为6s的事件到达时watermark恰好触发窗口1,时间戳为10s的事件到达时的watermark恰好触发窗口2。
注意:
- 窗口是左闭右开,形式:
[window_start_time, window_end_time)
- window的设定是系统定义好了的,window会一直按照指定的时间间隔进行划分,不论这个window中有没有数据。
- Flink会根据Event time是否在这个window期间,将数据发送到相应window中。属于这个 window 范围的数据会被不断加入到 window 中,所有未触发的window都会等待触发。只要window没有触发,属于这个window范围的数据就会一直被加入到该window,直到window被触发才会停止数据的追加。当window触发后才接受到的数据会丢失。
Flink提供两种方式指定timestamp生成watremark:
SourceFunction
:在event源头生成ctx.collectWithTimestamp(element, timestamp)
:其中,element:要发送的event,timestamp:event中的时间戳ctx.emitWatermark(new Watermark(timestamp))
: 其中,timestamp:设置最大延迟
class CustomGenerator extends SourceFunction[(Int, Long)] {
private var running = true
override def run(ctx: SourceFunction.SourceContext[(Int, Long)]): Unit = {
// 随机数生成器
val randomNum: Random = new Random()
while (running) {
val n: (Int, Long) = (randomNum.nextInt(3), new Date().getTime())
// 利用ctx上下文将数据返回
// 设定timestamp
ctx.collectWithTimestamp(n, n._2)
// 生成watermark 最大延时10ms
ctx.emitWatermark(new Watermark(n._2 - 10))
Thread.sleep(500)
}
}
override def cancel(): Unit = {
running = false
}
}
-
Timestamp Assigner
:Flink自带,根据生成方式分两种:-
Periodic Watermark
:周期性地生成,两种实现:Ascending Timestamp
:升序模式,适用于顺序event:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) socketData.map(line => { (line.split(",")(0), line.split(",")(1).toInt) }) // 指定时间字段 .assignAscendingTimestamps(_._2) .keyBy(_._1) .timeWindow(Time.minutes(3)) .sum(0) .print()
Bounded-out-of-orderness
:固定延时间隔
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) socketData.map(line => { (line.split(",")(0), line.split(",")(1).toInt) }) // 指定时间字段 .assignTimestampsAndWatermarks(new MyAssigner) .peocess(...) // 参数为延迟时间 class MyAssigner extends BoundedOutOfOrdernessTimestampExtractor[(String, Int)](Time.seconds(3)) { // 指定时间字段 override def extractTimestamp(element: (String, Int)): Long = { element._2 } }
-
Punctuated Watermark
:根据接入event的数量生成
-