2.4.5 window聚合函数

在窗口通过默认的trigger(也可以自定义trigger,后面讲)发射数据之后,我们需要对窗口内的数据进行计算,这时就需要窗口的聚合函数,聚合函数有多种,例如ReduceFunction,AggregateFunction,FoldFunction,ProcessWindowFunction等。其中ReduceFunction和AggregateFunction效率比较高,因为它们可以对数据进行增量聚合,ProcessWindowFunction性能比较低,因为它要迭代窗口的所有数据,但是它的一个优点就是能够对窗口进行额外的操作。

ReduceFunction

先看一下reduce函数的签名:def reduce(function: (T, T) => T): DataStream[T]reduce函数传入参数是一个函数,这个函数输入两个类型相同的参数,输出也是相同类型的数据,例子:

package com.littlely.windowfuncs

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

/**
 * spark,20,1593050400000
 * spark,30,1593050401000
 * flink,20,1593050404000
 * spark,40,1593050405000
 *
 * spark,100,1593050406000
 * flink,200,1593050407000
 * flink,300,1593050410000
 *
 * flink,10,1593050415000
 */
object ReduceFunc {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val text = env.socketTextStream("localhost", 9999)
    text.map(_.toLowerCase.split(",")).filter(_.nonEmpty)
      .map(p => Tuple3[String, Int, Long](p(0), p(1).toInt, p(2).toLong))
      .assignAscendingTimestamps(_._3)
      .keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .reduce((v1, v2) => (v1._1, v1._2 + v2._2, v2._3))
      .print()

    env.execute()
  }
}

AggregateFunction

AggregateFunction是ReduceFunction的一个泛化版本,其中包含着三种类型,既输入类型,中间状态类型和输出类型,查看aggregate签名def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]看到ACC为输入类型,V为中间状态类型,R为输出类型。

package com.littlely.windowfuncs

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time


/**
 * spark,20,1593050400000
 * spark,30,1593050401000
 * flink,20,1593050404000
 * spark,40,1593050405000
 *
 * spark,100,1593050406000
 * flink,200,1593050407000
 * flink,300,1593050410000
 *
 * flink,10,1593050415000
 */
object AggregateFunc {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val text = env.socketTextStream("localhost", 9999)
    text.map(_.toLowerCase.split(",")).filter(_.nonEmpty)
      .map(p => Tuple3[String, Int, Long](p(0), p(1).toInt, p(2).toLong))
      .assignAscendingTimestamps(_._3)
      .keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .aggregate(new MyAggregateFunc)
      .print()

    env.execute()
  }
}

// 求平均值
class MyAggregateFunc extends AggregateFunction[(String, Int, Long), (Int, Int), Double]{
  // 构建初始状态值
  override def createAccumulator(): (Int, Int) = {
    (0, 0)
  }

  // 输入值与中间状态值相加
  override def add(value: (String, Int, Long), accumulator: (Int, Int)): (Int, Int) = {
    (value._2 + accumulator._1, accumulator._2 + 1)
  }

  // 获取最终的结果
  override def getResult(accumulator: (Int, Int)): Double = {
    accumulator._1 / accumulator._2
  }

  // 两个中间状态的融合
  override def merge(a: (Int, Int), b: (Int, Int)): (Int, Int) = {
    (a._1 + b._1, a._2 + b._2)
  }
}

ProcessWindowFunction

ProcessWindowFunction可以获取包含窗口内的所有数据,不仅如此,他还能通过Context获取当前的watermark以及窗口信息等,可以说它是最灵活的窗口聚合函数,不过它的一个缺点就是需要缓存窗口内的所有数据,这可能会造成OOM。

package com.littlely.windowfuncs

import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

/**
 * spark,20,1593050400000
 * spark,30,1593050401000
 * flink,20,1593050404000
 * spark,40,1593050405000
 *
 * spark,100,1593050406000
 * flink,200,1593050407000
 * flink,300,1593050410000
 *
 * flink,10,1593050415000
 */
object ProcessFunc {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val text = env.socketTextStream("localhost", 9999)
    text.map(_.toLowerCase.split(",")).filter(_.nonEmpty)
      .map(p => Tuple3[String, Int, Long](p(0), p(1).toInt, p(2).toLong))
      .assignAscendingTimestamps(_._3)
      // 这里注意,因为process参数key是由keyBy函数中的KeySelector选取的,
      // 如果使用tuple的下标选取元素进行keyBy时不能识别类型,会编译出错,所以只能用KeySelector
      .keyBy(new KeySelector[(String, Int, Long), String] {
        override def getKey(value: (String, Int, Long)): String = value._1
      })
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .process(new MyProcessFunction)
      .print()

    env.execute()
  }
}

class MyProcessFunction extends ProcessWindowFunction[(String, Int, Long), (String, Int), String, TimeWindow]{
  override def process(key: String, context: Context, elements: Iterable[(String, Int, Long)], out: Collector[(String, Int)]): Unit = {
    var count: Int = 0
    for (i <- elements){
      count = count + i._2
    }
    out.collect((key, count))
  }
}

至此,窗口的所有必要操作都讲完了,下面就开始讲一些可选的操作。

results matching ""

    No results matching ""