2.2 Data Sources
Flink数据的接入接口非常丰富,有基于文件的,有基于socket的,有基于collection的,也有基于自定义的,当然还有第三方的connector,例如kafka等,这个后面会讲。基于socket和基于collection的一般在测试的时候使用,后面测试时基本上都使用socket。
这里主要说明一下自定义DataSource,自定义的source可以通过实现SourceFunction
接口或者RichSourceFunction
这个抽象类来完成,然后通过StreamExecutionEnvironment
的addSource
方法添加数据源。
package com.littlely.udfsource
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import scala.util.Random
object UDFSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val persons = env.addSource(new MySource)
persons.map(p => (p.name, p.salary))
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print()
env.execute("SalarySum")
}
}
case class Person(name: String, salary: Int)
class MySource extends SourceFunction[Person]{
var isRunning = true
val names = List("Alice", "Mike")
val salaries = List(20, 10, 40, 70)
val random = new Random(4321)
override def run(ctx: SourceFunction.SourceContext[Person]): Unit = {
while (true){
val selectSalary = random.nextInt(4)
val selectName = random.nextInt(2)
val person = Person(name=names(selectName), salary=salaries(selectSalary))
ctx.collect(person)
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
isRunning = false
}
}
6> (Mike,20) 6> (Alice,40) 6> (Alice,100) 6> (Mike,60) 6> (Alice,130) 6> (Mike,90) 6> (Alice,160) 6> (Mike,10)
实现的SourceFunction
接口主要有两个方法,一个是run
方法,另一个是cancel
方法,run
方法主要是向外部发送数据,而cancel
方法就是在调用时能够保证取消数据的输出。SourceFunction
还有一个内部接口SourceContext
,不仅可以用来发送数据,而且还可以增加TimeStamp并发送watermark。