2.4.3 Timestamps 和watermarks

在前面的1.2.4节已经介绍了flink支持的三种时间语义,主要使用的是event time和processing time语义,因此可以使用env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)或者env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)来使用指定的语义。那什么是watermark呢?watermark通俗地来说就是在数据滞后的一定时间内,仍然把这段时间内的数据放入前一个窗口之中来计算的时间点,它为处理数据的滞后而提出的。例如现在一个窗口为10:00:00-10:10:00,按道理来说,如果来了一条数据时间为10:10:00,这个窗口应该就执行了,但是我设置了滞后时间为5s,这时只有来了一条数据时间大于等于10:10:05的时候这个窗口才会执行。可以使用一个公式来确定什么时候执行窗口:

max(event_time) - max_lag_time >= window.end

其中max(event_time)就是在窗口中最大的那个事件时间,window.end就是窗口的结束时间,例如一个窗口时间仍然为10:00:00-10:10:00,既window.end=10:10:00,max_lag_time为5s,来了一条数据10:10:00,并且这个事件时间在窗口中是最大的,那么根据公式10:10:00 - 5s = 10:09:55 < 10:10:00,这个窗口不会执行,但当来了一条数据10:10:05时,根据公式10:10:05 - 5s = 10:10:00 >= 10:10:00,这个时候窗口就会执行运算(官网上说了,timestamp和watermark都要以毫秒表示的Long类型数据,这里只是说明如何处理的)。

代码中是使用assignTimestampsAndWatermarks方法来指定watermark的,flink将watermark根据生成方式分成两种类型,一种是periodica watermark,另一种是punctuated watermark。periodica watermark是根据时间间隔周期性的生成watermark,而punctuated watermark是根据接入数据的数量生成watermark,例如数据流中特定元素的数量达到一定条件后生成watermark。这里主要讲periodica watermark,flink内部实现了两种periodica watermark方式,一种是升序模式,既认为时间一直是增大的,没有滞后时间的数据,另一种是乱序模式,既设定一个能够容忍多长的数据滞后时间。

(1) 升序模式

package com.littlely.watermarkops

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
 * 数据:
 * spark,20,1593050400000
 * spark,30,1593050401000
 * flink,10,1593050402000
 * flink,20,1593050402000
 * spark,40,1593050405000
 * spark,100,1593050406000
 * flink,200,1593050407000
 * flink,300,1593050410000
 */
object AscendingWatermarkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)  // 设置时间语义

    val text = env.socketTextStream("localhost", 9999)
    text.map(_.toLowerCase.split(",")).filter(_.nonEmpty)
      .map(p=>Tuple3[String, Int, Long](p(0), p(1).toInt, p(2).toLong))
      .assignAscendingTimestamps(_._3)  // 升序模式
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)
      .print()

    env.execute()
  }
}

运行结果:

(spark,50,1593050400000)
(flink,30,1593050402000)
(spark,140,1593050405000)
(flink,200,1593050407000)

注意在输入最后一条数据的时候它并没有计算进去,是因为窗口时间区间是左闭右开的。

(2) 乱序模式

package com.littlely.watermarkops

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
 * 数据:
 * spark,20,1593050400000
 * spark,30,1593050401000
 * flink,10,1593050402000
 * flink,20,1593050402000
 *
 * spark,40,1593050405000
 * spark,100,1593050406000
 * spark,10,1593050403000
 * flink,200,1593050407000
 *
 * flink,300,1593050410000
 * hadoop,20,1593050411000
 * hadoop,20,1593050412000
 */
object UnorderedWatermarkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val text = env.socketTextStream("localhost", 9999)
    text.map(_.toLowerCase.split(",")).filter(_.nonEmpty)
      .map(p=>Tuple3[String, Int, Long](p(0), p(1).toInt, p(2).toLong))
      .assignTimestampsAndWatermarks(new MyWatermarkAssigner)  // 乱序模式
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)
      .print()

    env.execute()
  }
}

class MyWatermarkAssigner extends BoundedOutOfOrdernessTimestampExtractor[(String, Int, Long)](Time.seconds(2)){
  override def extractTimestamp(element: (String, Int, Long)): Long = {
    element._3
  }
}

运行结果:

(spark,60,1593050400000)
(flink,30,1593050402000)
(spark,140,1593050405000)
(flink,200,1593050407000)
(flink,300,1593050410000)
(hadoop,40,1593050411000)

注意,最后两行结果是在退出socket时才打印出来的,同学们最好把数据一行一行的输入,看看结果会有什么变化。

(3) 自定义timestamp assigner

自定义的timestamp assigner主要是通过AssignerWithPeriodicWatermarks<T>来实现的。

package com.littlely.watermarkops

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time

/**
 * 数据:
 * spark,20,1593050400000
 * spark,30,1593050401000
 * flink,10,1593050402000
 * flink,20,1593050402000
 * spark,40,1593050405000
 * spark,100,1593050406000
 * spark,10,1593050403000
 * flink,200,1593050407000
 * flink,300,1593050410000
 * hadoop,20,1593050411000
 * hadoop,20,1593050412000
 */
object UserDefinedTimestampExtractor {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val text = env.socketTextStream("localhost", 9999)
    text.map(_.toLowerCase.split(",")).filter(_.nonEmpty)
      .map(p=>Tuple3[String, Int, Long](p(0), p(1).toInt, p(2).toLong))
      .assignTimestampsAndWatermarks(new MyTimestampExtractor(Time.seconds(2)))
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)
      .print()

    env.execute()
  }
}

// 自定义timestamp extractor
class MyTimestampExtractor extends AssignerWithPeriodicWatermarks[(String, Int, Long)] {
  private var lastWatermark = 0L
  private var maxTimestamp = 0L
  private var time: Long = _

  def this(time: Time){
    this()
    this.time = maxTimestamp + time.toMilliseconds  // time不能直接序列化,需要这样转化一下
  }

  override def getCurrentWatermark: Watermark = {
    val marks = maxTimestamp - time
    if (marks >= lastWatermark){
      lastWatermark = marks
    }

    new Watermark(lastWatermark)
  }

  override def extractTimestamp(element: (String, Int, Long), previousElementTimestamp: Long): Long = {
    val currentTimestamp = element._3
    if (currentTimestamp > maxTimestamp){
      this.maxTimestamp = currentTimestamp
    }
    currentTimestamp
  }
}

运行结果:

(spark,60,1593050400000)
(flink,30,1593050402000)
(flink,200,1593050407000)
(spark,140,1593050405000)
(hadoop,40,1593050411000)
(flink,300,1593050410000)

比较(2)和(3),它们的运行结果是一样的,因为(2)的实现和(3)的实现是一样的。在实现的时候主要使用getCurrentWatermarkextractTimestamp方法,extractTimestamp主要就是获取当前数据的timestamp,并与最大的timestamp相比较,从而获得公式中的max(event_time)并返回当前的timestamp,然后在getCurrentWatermark计算新的watermark,并与前一个watermark相比较,并返回一个新的watermark。

results matching ""

    No results matching ""