2.4.7 Evictors
Evictor就是对windowFunction前后的数据做一下清理工作,只保留想要的数据进行一些窗口函数的操作或者窗口函数操作后只保留一些想要的结果。Flink内部实现了CountEvictor
,DeltaVictor
和TimeEvictor
。以CountEvictor
为例进行一下演示。
package com.littlely.evictors
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor
import org.apache.flink.streaming.api.windowing.time.Time
/**
* 数据:
* spark,20,1593050400000
* spark,30,1593050401000
* flink,10,1593050402000
* flink,20,1593050402000
* flink,20,1593050402000
* spark,40,1593050405000
*/
object CountEvictorOps {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = env.socketTextStream("localhost", 9999)
text.map(_.toLowerCase.split(",")).filter(_.nonEmpty)
.map(p => Tuple3[String, Int, Long](p(0), p(1).toInt, p(2).toLong))
.assignAscendingTimestamps(_._3)
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.evictor(CountEvictor.of(2L)) // evictor保留个数为2
.reduce((v1, v2) => (v1._1, v1._2 + v2._2, v2._3))
.print()
env.execute()
}
}
根据如上数据,如果关掉evictor那一行代码结果为:
(spark,50,1593050401000) (flink,50,1593050402000)
如果打开evictor那一行代码结果为:
(spark,50,1593050401000) (flink,40,1593050402000)
可以看到,打开之后,只保留了同一个窗口同一个key的两条数据,也就是只保留一个pane的两条数据。
查看CountEvictor
源码中的evictor
方法:
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
if (size <= maxCount) {
return;
} else {
int evictedCount = 0;
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
iterator.next();
evictedCount++;
if (evictedCount > size - maxCount) {
break;
} else {
iterator.remove();
}
}
}
}
从上方法可以看到,它只保留你设置的最大保留个数元素,其他的从窗口开始进行删除。
当然也可以自己定义evictor,直接重写evictorBefore
和evictorAfter
方法来实现清除窗口函数前后数据的逻辑。