2.4.8 延迟数据处理(Allow lateness)
在使用eventtime语义时,某些数据延迟可能非常严重,而watermark在窗口的window end就会触发计算,所以延迟数据会被丢弃掉。而在设置延迟数据最大时间时,如果来了一条新数据,但watermark已经超过window end但还没有超过window end加上延迟数据最大时间,这时flink仍然会把这条数据加入到窗口中,但是默认地这部分延迟数据会再次触发窗口计算。
package com.littlely.latedataops
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
/**
* 数据:
* spark,20,1593050400000
* spark,30,1593050401000
* flink,10,1593050402000
*
* spark,40,1593050405000
* spark,100,1593050406000
* spark,10,1593050390000
* flink,200,1593050407000
* spark,10,1593050310000
*/
object ProcessLateData {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val lateOutputTag: OutputTag[(String, Int, Long)] = new OutputTag[(String, Int, Long)]("late-data")
val text = env.socketTextStream("localhost", 9999)
val res = text.map(_.toLowerCase.split(",")).filter(_.nonEmpty)
.map(p => Tuple3[String, Int, Long](p(0), p(1).toInt, p(2).toLong))
.assignTimestampsAndWatermarks(new MyWatermarkAssigner)
.keyBy(0)
.timeWindow(Time.seconds(5))
.allowedLateness(Time.seconds(3))
.sideOutputLateData(lateOutputTag)
.sum(1)
res.print()
res.getSideOutput(lateOutputTag).map(_.toString() + " is late").print()
env.execute()
}
}
class MyWatermarkAssigner extends BoundedOutOfOrdernessTimestampExtractor[(String, Int, Long)](Time.seconds(2)){
override def extractTimestamp(element: (String, Int, Long)): Long = {
element._3
}
}
输出结果:
(spark,10,1593050390000) is late (spark,50,1593050400000) (flink,10,1593050402000) (spark,10,1593050310000) is late
可以对比一下在没有设置延迟数据处理时的输出结果。另外注意的是在导入OutputTag
类时别导入出错。