2.4.8 延迟数据处理(Allow lateness)

在使用eventtime语义时,某些数据延迟可能非常严重,而watermark在窗口的window end就会触发计算,所以延迟数据会被丢弃掉。而在设置延迟数据最大时间时,如果来了一条新数据,但watermark已经超过window end但还没有超过window end加上延迟数据最大时间,这时flink仍然会把这条数据加入到窗口中,但是默认地这部分延迟数据会再次触发窗口计算。

package com.littlely.latedataops

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time

/**
 * 数据:
 * spark,20,1593050400000
 * spark,30,1593050401000
 * flink,10,1593050402000
 *
 * spark,40,1593050405000
 * spark,100,1593050406000
 * spark,10,1593050390000
 * flink,200,1593050407000
 * spark,10,1593050310000
 */
object ProcessLateData {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val lateOutputTag: OutputTag[(String, Int, Long)] = new OutputTag[(String, Int, Long)]("late-data")

    val text = env.socketTextStream("localhost", 9999)
    val res = text.map(_.toLowerCase.split(",")).filter(_.nonEmpty)
      .map(p => Tuple3[String, Int, Long](p(0), p(1).toInt, p(2).toLong))
      .assignTimestampsAndWatermarks(new MyWatermarkAssigner)
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .allowedLateness(Time.seconds(3))
      .sideOutputLateData(lateOutputTag)
      .sum(1)

    res.print()
    res.getSideOutput(lateOutputTag).map(_.toString() + " is late").print()

    env.execute()
  }

}

class MyWatermarkAssigner extends BoundedOutOfOrdernessTimestampExtractor[(String, Int, Long)](Time.seconds(2)){
  override def extractTimestamp(element: (String, Int, Long)): Long = {
    element._3
  }
}

输出结果:

(spark,10,1593050390000) is late
(spark,50,1593050400000)
(flink,10,1593050402000)
(spark,10,1593050310000) is late

可以对比一下在没有设置延迟数据处理时的输出结果。另外注意的是在导入OutputTag类时别导入出错。

results matching ""

    No results matching ""