2.2 Data Sources

Flink数据的接入接口非常丰富,有基于文件的,有基于socket的,有基于collection的,也有基于自定义的,当然还有第三方的connector,例如kafka等,这个后面会讲。基于socket和基于collection的一般在测试的时候使用,后面测试时基本上都使用socket。

这里主要说明一下自定义DataSource,自定义的source可以通过实现SourceFunction接口或者RichSourceFunction这个抽象类来完成,然后通过StreamExecutionEnvironmentaddSource方法添加数据源。

package com.littlely.udfsource

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

import scala.util.Random

object UDFSource {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val persons = env.addSource(new MySource)

    persons.map(p => (p.name, p.salary))
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)
      .print()

    env.execute("SalarySum")
  }
}

case class Person(name: String, salary: Int)

class MySource extends SourceFunction[Person]{
  var isRunning = true
  val names = List("Alice", "Mike")
  val salaries = List(20, 10, 40, 70)
  val random = new Random(4321)

  override def run(ctx: SourceFunction.SourceContext[Person]): Unit = {
    while (true){
      val selectSalary = random.nextInt(4)
      val selectName = random.nextInt(2)
      val person = Person(name=names(selectName), salary=salaries(selectSalary))

      ctx.collect(person)
      Thread.sleep(1000)
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }
}
6> (Mike,20)
6> (Alice,40)
6> (Alice,100)
6> (Mike,60)
6> (Alice,130)
6> (Mike,90)
6> (Alice,160)
6> (Mike,10)

实现的SourceFunction接口主要有两个方法,一个是run方法,另一个是cancel方法,run方法主要是向外部发送数据,而cancel方法就是在调用时能够保证取消数据的输出。SourceFunction还有一个内部接口SourceContext,不仅可以用来发送数据,而且还可以增加TimeStamp并发送watermark。

results matching ""

    No results matching ""