2.5.1 Kafka connector

Flink支持kafka 0.8及以上版本,本次测试kafka版本为2.1.0,不过生产环境用的较多的还是0.8和1.0版本的,可以根据自己情况进行修改。首先要导入maven依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.10.0</version>
</dependency>

其他版本的导入依赖官网上有介绍,可以做参考。

(1) Kafka Consumer

package com.littlely.kafkaconnector

import java.util.Properties
import java.util
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.api.scala._
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord


object KafkaSourceTest {
  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.setProperty("bootstrap.servers", "localhost:9092")
    props.setProperty("group.id", "test")

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

//    val kafkaSource = env.addSource(new FlinkKafkaConsumer[JSONObject]("test", new MyDeserializer(), props))
//    kafkaSource.map(p => (p.getString("name"), p.getInteger("age"))).print()

    val kafkaSource = env.addSource(new FlinkKafkaConsumer[JSONObject]("test", new MyKafkaDeserializer, props))
    kafkaSource.map(p =>
      (p.getString("kk"), p.getString("kv"), p.getInteger("offset"), p.getInteger("timestamp"))).print()

    env.execute()
  }
}

class MyDeserializer extends DeserializationSchema[JSONObject]{
  override def deserialize(message: Array[Byte]): JSONObject = {
    JSON.parseObject(new String(message))
  }

  override def isEndOfStream(nextElement: JSONObject): Boolean = false

  override def getProducedType: TypeInformation[JSONObject] = {
    TypeInformation.of(new TypeHint[JSONObject] {})
  }
}

class MyKafkaDeserializer extends KafkaDeserializationSchema[JSONObject]{
  override def isEndOfStream(nextElement: JSONObject): Boolean = false

  override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): JSONObject ={
    val k = record.key()
    val v = record.value()
    val offset = record.offset()
    val timestamp = record.timestamp()

    // 注意:这里使用的是fastJson对Json串进行解析,不过在使用Scala语言时不能使用scala的Map,而只能使用Java的Map类型
    val map = new util.HashMap[String, Any]()
    map.put("kk", k)
    map.put("kv", JSON.parseObject(new String(v)))
    map.put("offset", offset)
    map.put("timestamp", timestamp)

    val jsonObject = new JSONObject()
    jsonObject.putAll(map)
    jsonObject
  }

  override def getProducedType: TypeInformation[JSONObject] = TypeInformation.of(new TypeHint[JSONObject]{})
}

kafka consumer一般接受三个参数,第一个为kafka topic;第二个为kafka DeserializationSchema / KafkaDeserializationSchema,用来反序列化kafka的数据,DeserializationSchemaKafkaDeserializationSchema重写的方法都是一样的,唯一不一样的是deserialize处理的内容不同,DeserializationSchema是直接对kafka的value进行反序列化,而KafkaDeserializationSchema可以处理整个kafka consumer record,因此KafkaDeserializationSchema包含的信息比DeserializationSchema多;第三个参数就是kafka包含的一些属性,例如kafka broker的地址,group.id等信息。

(2) Kafka Producer

package com.littlely.kafkaconnector

import java.lang
import java.util.Properties

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.kafka.clients.producer.ProducerRecord

object KafkaSinkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.enableCheckpointing(500)

    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")

    val kafkaSink = new FlinkKafkaProducer[String](
      "test",
      new MyKafkaSerializer("test"),
      props,
      FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)

    val input = env.socketTextStream("localhost", 9999)
    input.addSink(kafkaSink)

    env.execute()
  }
}

// 自定义kafka serializer
class MyKafkaSerializer(topic: String) extends KafkaSerializationSchema[String]{
  override def serialize(element: String, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
    val value = element.getBytes()
    new ProducerRecord[Array[Byte], Array[Byte]](topic, value)
  }
}

然后打开kafka的消费端以及netcat,并运行该程序,在netcat输入数据就可以看到kafka消费端输出数据了。Kafka sink还是比较简单的,主要注意的一点就是使用的scala版本要对应,否则无法灌入数据。

results matching ""

    No results matching ""