2.4.5 window聚合函数
在窗口通过默认的trigger(也可以自定义trigger,后面讲)发射数据之后,我们需要对窗口内的数据进行计算,这时就需要窗口的聚合函数,聚合函数有多种,例如ReduceFunction,AggregateFunction,FoldFunction,ProcessWindowFunction等。其中ReduceFunction和AggregateFunction效率比较高,因为它们可以对数据进行增量聚合,ProcessWindowFunction性能比较低,因为它要迭代窗口的所有数据,但是它的一个优点就是能够对窗口进行额外的操作。
ReduceFunction
先看一下reduce
函数的签名:def reduce(function: (T, T) => T): DataStream[T]
,reduce
函数传入参数是一个函数,这个函数输入两个类型相同的参数,输出也是相同类型的数据,例子:
package com.littlely.windowfuncs
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
/**
* spark,20,1593050400000
* spark,30,1593050401000
* flink,20,1593050404000
* spark,40,1593050405000
*
* spark,100,1593050406000
* flink,200,1593050407000
* flink,300,1593050410000
*
* flink,10,1593050415000
*/
object ReduceFunc {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = env.socketTextStream("localhost", 9999)
text.map(_.toLowerCase.split(",")).filter(_.nonEmpty)
.map(p => Tuple3[String, Int, Long](p(0), p(1).toInt, p(2).toLong))
.assignAscendingTimestamps(_._3)
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce((v1, v2) => (v1._1, v1._2 + v2._2, v2._3))
.print()
env.execute()
}
}
AggregateFunction
AggregateFunction是ReduceFunction的一个泛化版本,其中包含着三种类型,既输入类型,中间状态类型和输出类型,查看aggregate
签名def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]
看到ACC
为输入类型,V
为中间状态类型,R
为输出类型。
package com.littlely.windowfuncs
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
/**
* spark,20,1593050400000
* spark,30,1593050401000
* flink,20,1593050404000
* spark,40,1593050405000
*
* spark,100,1593050406000
* flink,200,1593050407000
* flink,300,1593050410000
*
* flink,10,1593050415000
*/
object AggregateFunc {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = env.socketTextStream("localhost", 9999)
text.map(_.toLowerCase.split(",")).filter(_.nonEmpty)
.map(p => Tuple3[String, Int, Long](p(0), p(1).toInt, p(2).toLong))
.assignAscendingTimestamps(_._3)
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.aggregate(new MyAggregateFunc)
.print()
env.execute()
}
}
// 求平均值
class MyAggregateFunc extends AggregateFunction[(String, Int, Long), (Int, Int), Double]{
// 构建初始状态值
override def createAccumulator(): (Int, Int) = {
(0, 0)
}
// 输入值与中间状态值相加
override def add(value: (String, Int, Long), accumulator: (Int, Int)): (Int, Int) = {
(value._2 + accumulator._1, accumulator._2 + 1)
}
// 获取最终的结果
override def getResult(accumulator: (Int, Int)): Double = {
accumulator._1 / accumulator._2
}
// 两个中间状态的融合
override def merge(a: (Int, Int), b: (Int, Int)): (Int, Int) = {
(a._1 + b._1, a._2 + b._2)
}
}
ProcessWindowFunction
ProcessWindowFunction可以获取包含窗口内的所有数据,不仅如此,他还能通过Context
获取当前的watermark以及窗口信息等,可以说它是最灵活的窗口聚合函数,不过它的一个缺点就是需要缓存窗口内的所有数据,这可能会造成OOM。
package com.littlely.windowfuncs
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
/**
* spark,20,1593050400000
* spark,30,1593050401000
* flink,20,1593050404000
* spark,40,1593050405000
*
* spark,100,1593050406000
* flink,200,1593050407000
* flink,300,1593050410000
*
* flink,10,1593050415000
*/
object ProcessFunc {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = env.socketTextStream("localhost", 9999)
text.map(_.toLowerCase.split(",")).filter(_.nonEmpty)
.map(p => Tuple3[String, Int, Long](p(0), p(1).toInt, p(2).toLong))
.assignAscendingTimestamps(_._3)
// 这里注意,因为process参数key是由keyBy函数中的KeySelector选取的,
// 如果使用tuple的下标选取元素进行keyBy时不能识别类型,会编译出错,所以只能用KeySelector
.keyBy(new KeySelector[(String, Int, Long), String] {
override def getKey(value: (String, Int, Long)): String = value._1
})
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new MyProcessFunction)
.print()
env.execute()
}
}
class MyProcessFunction extends ProcessWindowFunction[(String, Int, Long), (String, Int), String, TimeWindow]{
override def process(key: String, context: Context, elements: Iterable[(String, Int, Long)], out: Collector[(String, Int)]): Unit = {
var count: Int = 0
for (i <- elements){
count = count + i._2
}
out.collect((key, count))
}
}
至此,窗口的所有必要操作都讲完了,下面就开始讲一些可选的操作。