Flink——实战之Redis Sink
主要内容:实现Flink写数据到Redis,即Redis Sink。
准备工作
Flink Redis Connector
提供了一个Sink可将数据写入Redis。若要使用该连接器需要将以下内容引入工程:
<!-- redis connector -->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
代码实现
核心代码在于:
streaming.addSink(new RedisSink[Person](redisConf, new MyRedisMapper))
这里你需要两个参数:
redisConf
:redis配置信息
// 配置redis conf
val redisConf: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
.setHost("host")
.setPort(6379)
.setTimeout(30000)
.build()
MyRedisMapper
:自定义的RedisMapper
//自定义MyRedisMapper
class MyRedisMapper extends RedisMapper[Person] {
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, REDIS_KEY)
}
// redis filed
override def getKeyFromData(t: Person): String = {
t.name
}
// redis value
override def getValueFromData(t: Person): String = {
t.age + ":" + t.gender + ":" + t.height
}
完整代码如下:
package org.ourhome.streamapi
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
/**
* @Author Do
* @Date 2020/4/18 21:19
*/
object RedisSinkTest {
private val REDIS_KEY = "person_message"
def main(args: Array[String]): Unit = {
val params: ParameterTool = ParameterTool.fromArgs(args)
val runType: String = params.get("runtype")
println("runType: " + runType)
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.enableCheckpointing(5000)
env.setStateBackend(new FsStateBackend("file:///D:/Work/Code/flinkdev/src/main/resources/checkpoint"))
val inputStream: DataStream[String] = env.readTextFile("D:\\Work\\Code\\flinkdev\\src\\main\\resources\\textfile\\customdata.txt")
// 处理inputStream,包装成Person类
val streaming: DataStream[Person] = inputStream.map(line => {
println(line)
val strings: Array[String] = line.split(",")
Person(strings(0).trim, strings(1).trim.toInt, strings(2).trim, strings(3).trim.toFloat)
})
// 配置redis conf
val redisConf: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
.setHost("host")
.setPort(6379)
.setTimeout(30000)
.build()
streaming.addSink(new RedisSink[Person](redisConf, new MyRedisMapper))
env.execute("Redis Sink")
}
case class Person (name: String, age: Int, gender: String, height: Float)
//自定义MyRedisMapper
class MyRedisMapper extends RedisMapper[Person] {
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, REDIS_KEY)
}
// redis filed
override def getKeyFromData(t: Person): String = {
t.name
}
// redis value
override def getValueFromData(t: Person): String = {
t.age + ":" + t.gender + ":" + t.height
}
}
}
customdata.txt文件
小明,20,man,180.2
小红,22,woman,178.4
小黑,18,man,192.9
小兰,19,woman,188.0
小爱,30,woman,177.3
Redis数据:
row | key | value |
---|---|---|
1 | 小明 | 20:man:180.2 |
2 | 小红 | 22:woman:178.4 |
3 | 小黑 | 18:man:192.9 |
4 | 小兰 | 19:woman:188.0 |
5 | 小爱 | 30:woman:177.3 |