2.4.3 Timestamps 和watermarks
在前面的1.2.4节已经介绍了flink支持的三种时间语义,主要使用的是event time和processing time语义,因此可以使用env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
或者env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
来使用指定的语义。那什么是watermark呢?watermark通俗地来说就是在数据滞后的一定时间内,仍然把这段时间内的数据放入前一个窗口之中来计算的时间点,它为处理数据的滞后而提出的。例如现在一个窗口为10:00:00-10:10:00,按道理来说,如果来了一条数据时间为10:10:00,这个窗口应该就执行了,但是我设置了滞后时间为5s,这时只有来了一条数据时间大于等于10:10:05的时候这个窗口才会执行。可以使用一个公式来确定什么时候执行窗口:
其中max(event_time)就是在窗口中最大的那个事件时间,window.end就是窗口的结束时间,例如一个窗口时间仍然为10:00:00-10:10:00,既window.end=10:10:00,max_lag_time为5s,来了一条数据10:10:00,并且这个事件时间在窗口中是最大的,那么根据公式10:10:00 - 5s = 10:09:55 < 10:10:00,这个窗口不会执行,但当来了一条数据10:10:05时,根据公式10:10:05 - 5s = 10:10:00 >= 10:10:00,这个时候窗口就会执行运算(官网上说了,timestamp和watermark都要以毫秒表示的Long类型数据,这里只是说明如何处理的)。
代码中是使用assignTimestampsAndWatermarks
方法来指定watermark的,flink将watermark根据生成方式分成两种类型,一种是periodica watermark,另一种是punctuated watermark。periodica watermark是根据时间间隔周期性的生成watermark,而punctuated watermark是根据接入数据的数量生成watermark,例如数据流中特定元素的数量达到一定条件后生成watermark。这里主要讲periodica watermark,flink内部实现了两种periodica watermark方式,一种是升序模式,既认为时间一直是增大的,没有滞后时间的数据,另一种是乱序模式,既设定一个能够容忍多长的数据滞后时间。
(1) 升序模式
package com.littlely.watermarkops
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* 数据:
* spark,20,1593050400000
* spark,30,1593050401000
* flink,10,1593050402000
* flink,20,1593050402000
* spark,40,1593050405000
* spark,100,1593050406000
* flink,200,1593050407000
* flink,300,1593050410000
*/
object AscendingWatermarkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 设置时间语义
val text = env.socketTextStream("localhost", 9999)
text.map(_.toLowerCase.split(",")).filter(_.nonEmpty)
.map(p=>Tuple3[String, Int, Long](p(0), p(1).toInt, p(2).toLong))
.assignAscendingTimestamps(_._3) // 升序模式
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print()
env.execute()
}
}
运行结果:
(spark,50,1593050400000) (flink,30,1593050402000) (spark,140,1593050405000) (flink,200,1593050407000)
注意在输入最后一条数据的时候它并没有计算进去,是因为窗口时间区间是左闭右开的。
(2) 乱序模式
package com.littlely.watermarkops
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* 数据:
* spark,20,1593050400000
* spark,30,1593050401000
* flink,10,1593050402000
* flink,20,1593050402000
*
* spark,40,1593050405000
* spark,100,1593050406000
* spark,10,1593050403000
* flink,200,1593050407000
*
* flink,300,1593050410000
* hadoop,20,1593050411000
* hadoop,20,1593050412000
*/
object UnorderedWatermarkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = env.socketTextStream("localhost", 9999)
text.map(_.toLowerCase.split(",")).filter(_.nonEmpty)
.map(p=>Tuple3[String, Int, Long](p(0), p(1).toInt, p(2).toLong))
.assignTimestampsAndWatermarks(new MyWatermarkAssigner) // 乱序模式
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print()
env.execute()
}
}
class MyWatermarkAssigner extends BoundedOutOfOrdernessTimestampExtractor[(String, Int, Long)](Time.seconds(2)){
override def extractTimestamp(element: (String, Int, Long)): Long = {
element._3
}
}
运行结果:
(spark,60,1593050400000) (flink,30,1593050402000) (spark,140,1593050405000) (flink,200,1593050407000) (flink,300,1593050410000) (hadoop,40,1593050411000)
注意,最后两行结果是在退出socket时才打印出来的,同学们最好把数据一行一行的输入,看看结果会有什么变化。
(3) 自定义timestamp assigner
自定义的timestamp assigner主要是通过AssignerWithPeriodicWatermarks<T>
来实现的。
package com.littlely.watermarkops
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
/**
* 数据:
* spark,20,1593050400000
* spark,30,1593050401000
* flink,10,1593050402000
* flink,20,1593050402000
* spark,40,1593050405000
* spark,100,1593050406000
* spark,10,1593050403000
* flink,200,1593050407000
* flink,300,1593050410000
* hadoop,20,1593050411000
* hadoop,20,1593050412000
*/
object UserDefinedTimestampExtractor {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = env.socketTextStream("localhost", 9999)
text.map(_.toLowerCase.split(",")).filter(_.nonEmpty)
.map(p=>Tuple3[String, Int, Long](p(0), p(1).toInt, p(2).toLong))
.assignTimestampsAndWatermarks(new MyTimestampExtractor(Time.seconds(2)))
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print()
env.execute()
}
}
// 自定义timestamp extractor
class MyTimestampExtractor extends AssignerWithPeriodicWatermarks[(String, Int, Long)] {
private var lastWatermark = 0L
private var maxTimestamp = 0L
private var time: Long = _
def this(time: Time){
this()
this.time = maxTimestamp + time.toMilliseconds // time不能直接序列化,需要这样转化一下
}
override def getCurrentWatermark: Watermark = {
val marks = maxTimestamp - time
if (marks >= lastWatermark){
lastWatermark = marks
}
new Watermark(lastWatermark)
}
override def extractTimestamp(element: (String, Int, Long), previousElementTimestamp: Long): Long = {
val currentTimestamp = element._3
if (currentTimestamp > maxTimestamp){
this.maxTimestamp = currentTimestamp
}
currentTimestamp
}
}
运行结果:
(spark,60,1593050400000) (flink,30,1593050402000) (flink,200,1593050407000) (spark,140,1593050405000) (hadoop,40,1593050411000) (flink,300,1593050410000)
比较(2)和(3),它们的运行结果是一样的,因为(2)的实现和(3)的实现是一样的。在实现的时候主要使用getCurrentWatermark
和extractTimestamp
方法,extractTimestamp
主要就是获取当前数据的timestamp,并与最大的timestamp相比较,从而获得公式中的max(event_time)并返回当前的timestamp,然后在getCurrentWatermark
计算新的watermark,并与前一个watermark相比较,并返回一个新的watermark。