2.4.4 window操作
Window操作可以说是flink中最常用的操作了,它使得无界的数据流变成有界的数据流,这样可以控制数据接受的时间范围进而处理该时间范围内的数据,并且多个window操作可以使用多核进行并行处理。窗口的处理总共有两种方式分别是基于时间的窗口和基于数量的窗口,基于时间的窗口又有四种,分别是Tumbling window(滚动窗口),Sliding window(滑动窗口),Session window(会话窗口)和Global window(全局窗口),基于使用的频率,这里只说明Tumbling window和Sliding window。
在窗口计算时,又会根据上游数据是否按key进行分区又分为keyed window和non-keyed window。如果是keyed window,数据会根据key在不同的task中并行计算,得到的结果也是对每个key计算的结果,如果是non-keyed window,窗口的所有数据会在一个task中运行得到全局的结果。这里主要使用keyed window。
(1) window assigner
Window assigner主要作用就是如何确定把数据分配到窗口之中。flink有内置的window assigner操作,它们都是基于时间,当然也可以通过扩展WindowAssigner
类来自己实现自定义的window assigner。window assigner可以操作滚动窗口和滑动窗口,先说一下什么是窗口,滚动窗口和滑动窗口,窗口就是定义了一个开始时间和结束时间,在开始时间和结束时间之间的这一段时间就是一个窗口,例如时间10:00:00 - 10:10:00这就是一个10min的时间窗口,这个10min就是窗口的大小(window size);滚动窗口就是有一个固定的窗口大小,比如10min,窗口在移动的时候不重叠,例如10:00:00 - 10:10:00是一个窗口,移动到下一个窗口为10:10:00-10:20:00,并且这两个窗口之间的时间不重叠(窗口是左闭右开的区间);滑动窗口也有一个固定的窗口大小,不过在移动窗口的时候有重叠,例如10:00:00 - 10:10:00是一个窗口,移动到下一个窗口10:05:00-10:15:00,可以看到这两个窗口有明显的时间重叠,这个窗口就是大小为10min,步长(slide)为5min的滑动窗口。
内置的滚动窗口函数和滑动窗口函数分别是TumblingEventTimeWindow
(处理event time的,TumblingProcessingTimeWindow
为处理processing time的)和SlidingEventTimeWindow
(处理event time的,SlidingProcessingTimeWindow
为处理processing time的)。
这里分别看它们的一个例子:
Tumbling window
package com.littlely.windowops
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
/**
* spark,20,1593050400000
* spark,30,1593050401000
* flink,10,1593050402000
* flink,20,1593050402000
* spark,40,1593050405000
* spark,100,1593050406000
* flink,200,1593050407000
* flink,300,1593050410000
*/
object TumblingWindows {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = env.socketTextStream("localhost", 9999)
text.map(_.toLowerCase.split(",")).filter(_.nonEmpty)
.map(p=>Tuple3[String, Int, Long](p(0), p(1).toInt, p(2).toLong))
.assignAscendingTimestamps(_._3)
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(5))) // 滚动窗口,窗口大小为5s
.sum(1)
.print()
env.execute()
}
}
运行结果:
(spark,50,1593050400000) (flink,30,1593050402000) (spark,140,1593050405000) (flink,200,1593050407000) (flink,300,1593050410000)
注意,在使用TumblingEventTimeWindows
时需要指定event time时间语义以及设定watermark。
Sliding window
package com.littlely.windowops
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
/**
* spark,20,1593050400000
* spark,30,1593050401000
* flink,20,1593050404000
* spark,40,1593050405000
*
* spark,100,1593050406000
* flink,200,1593050407000
* flink,300,1593050410000
*
* flink,10,1593050415000
*/
object SlidingWindows {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = env.socketTextStream("localhost", 9999)
text.map(_.toLowerCase.split(",")).filter(_.nonEmpty)
.map(p => Tuple3[String, Int, Long](p(0), p(1).toInt, p(2).toLong))
.assignAscendingTimestamps(_._3)
.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 滑动窗口,窗口大小为10s,步长5s
.sum(1)
.print()
env.execute()
}
}
运行结果:
(spark,50,1593050400000) (flink,20,1593050404000) (spark,190,1593050400000) (flink,220,1593050404000) (flink,500,1593050407000) (spark,140,1593050405000)
可以一条一条地输入数据,看看结果是怎么变化的。
自定义window assigner
一般前两种window assigner就够用了,只不过在特殊情况下,自己需要的窗口触发方式和它们定义的触发方式有些差别,这些在讲解trigger的时候再进行说明。