Flink——原理与实战:RedisSink
主要内容:分析RedisSink源码,并结合具体案例实现Flink将数据写入Redis。
Flink Redis Connector
提供了一个Sink可将数据写入Redis。若要使用该连接器需要将以下内容引入工程:
<!-- redis connector -->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
1 源码分析
首先看一下RedisSink类的继承关系:
RedisSink间接继承了RichRunction接口,使其也拥有了一些具有生命周期的方法,并可以获取函数运行时的上下文。
RedisSink类需要两个参数:
FlinkJedisConfigBase
:Redis的配置文件RedisMapper
:用于生成Redis命令和键值对
public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper) {
Objects.requireNonNull(flinkJedisConfigBase, "Redis connection pool config should not be null");
Objects.requireNonNull(redisSinkMapper, "Redis Mapper can not be null");
Objects.requireNonNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null");
this.flinkJedisConfigBase = flinkJedisConfigBase;
this.redisSinkMapper = redisSinkMapper;
RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription();
this.redisCommand = redisCommandDescription.getCommand();
this.additionalKey = redisCommandDescription.getAdditionalKey();
}
FlinkJedisConfigBase分析
FlinkJedisClusterConfig
、FlinkJedisSentinelConfig
和FlinkJedisPoolConfig
三个类都继承了抽象类FlinkJedisConfigBase
,分别对应了Redis的集群模式、哨兵模式和单节点模式。且每个类都有一个内部类Builder,用于初始化一些变量,如host、port、timeout等参数。使用时根据情况选择相应的类即可。
RedisMapper分析
直接给出应用案例,用于理解:如何用于生成Redis命令和键值对。
class RedisTestMapper extends RedisMapper[Array[String]] {
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.LPUSH)
}
override def getKeyFromData(data: Array[String]): String = {
data(0)
}
override def getValueFromData(data: Array[String]): String = {
data(1)
}
}
2 RedisSink的方法分析
RedisSink类重写了3个方法:
- open():用于初始化Redis连接,包括集群模式、哨兵模式和单节点模式。该方法创建了一个redisCommandsContainer接口,里面包含可用的Redis命令。
@Override
public void open(Configuration parameters) throws Exception {
try {
this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
this.redisCommandsContainer.open();
} catch (Exception e) {
LOG.error("Redis has not been properly initialized: ", e);
throw e;
}
}
- close():关闭包含Redis命令的容器,也可以理解为关闭Redis连接。
@Override
public void close() throws IOException {
if (redisCommandsContainer != null) {
redisCommandsContainer.close();
}
}
invoke()
:根据传入数据input的数据类型,选择不同的Redis命令,将数据发送到Redis。Redis命令目前仅支持8个基础命令,其他的需要自己实现:push
sadd
set
publish
pfadd
zadd
zrem
hset
@Override
public void invoke(IN input) throws Exception {
String key = redisSinkMapper.getKeyFromData(input);
String value = redisSinkMapper.getValueFromData(input);
switch (redisCommand) {
case RPUSH:
this.redisCommandsContainer.rpush(key, value);
break;
case LPUSH:
this.redisCommandsContainer.lpush(key, value);
break;
case SADD:
this.redisCommandsContainer.sadd(key, value);
break;
case SET:
this.redisCommandsContainer.set(key, value);
break;
case PFADD:
this.redisCommandsContainer.pfadd(key, value);
break;
case PUBLISH:
this.redisCommandsContainer.publish(key, value);
break;
case ZADD:
this.redisCommandsContainer.zadd(this.additionalKey, value, key);
break;
case ZREM:
this.redisCommandsContainer.zrem(this.additionalKey, key);
break;
case HSET:
this.redisCommandsContainer.hset(this.additionalKey, key, value);
break;
default:
throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
}
}
3 RdeisSink执行过程
- 选择不同的方式(包括集群模式、单点模式、哨兵模式)创建FlinkJedisConfigBase对象;
- 创建RedisMapper对象。选定Redis命令并指定键值对;
- 使用以上两个参数,创建RedisSink对象:
new RedisSink(FlinkJedisConfigBase, RedisMapper)
; - 调用RedisSink对象的open方法,在该方法中调用RedisCommandsContainerBuilder,并根据FlinkJedisConfigBase对象类型,生成不同的RedisCommandsContainer:
FlinkJedisClusterConfig
类型对应生成RedisClusterContainer类。FlinkJedisSentinelConfig
和FlinkJedisPoolConfig
类型对应生成RedisContainer类。
其中,RedisClusterContainer类和RedisContainer类是RedisCommandsContainer接口的两个实现类,对应Redis具体操作。
4 应用案例
使用Flink将文本数据Sink到Redis。Redis使用单节点模式。
完整代码
package org.ourhome.streamapi
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
/**
* @Author Do
* @Date 2020/4/18 21:19
*/
object RedisSinkTest {
private val REDIS_KEY = "person_message"
def main(args: Array[String]): Unit = {
val params: ParameterTool = ParameterTool.fromArgs(args)
val runType: String = params.get("runtype")
println("runType: " + runType)
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.enableCheckpointing(5000)
env.setStateBackend(new FsStateBackend("file:///D:/Work/Code/flinkdev/src/main/resources/checkpoint"))
val inputStream: DataStream[String] = env.readTextFile("D:\\Work\\Code\\flinkdev\\src\\main\\resources\\textfile\\customdata.txt")
// 处理inputStream,包装成Person类
val streaming: DataStream[Person] = inputStream.map(line => {
println(line)
val strings: Array[String] = line.split(",")
Person(strings(0).trim, strings(1).trim.toInt, strings(2).trim, strings(3).trim.toFloat)
})
// 配置redis conf
val redisConf: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
.setHost("host")
.setPort(6379)
.setTimeout(30000)
.build()
streaming.addSink(new RedisSink[Person](redisConf, new MyRedisMapper))
env.execute("Redis Sink")
}
case class Person (name: String, age: Int, gender: String, height: Float)
//自定义MyRedisMapper
class MyRedisMapper extends RedisMapper[Person] {
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, REDIS_KEY)
}
// redis filed
override def getKeyFromData(t: Person): String = {
t.name
}
// redis value
override def getValueFromData(t: Person): String = {
t.age + ":" + t.gender + ":" + t.height
}
}
}
customdata.txt文件
小明,20,man,180.2
小红,22,woman,178.4
小黑,18,man,192.9
小兰,19,woman,188.0
小爱,30,woman,177.3
Redis数据:
row | key | value |
---|---|---|
1 | 小明 | 20:man:180.2 |
2 | 小红 | 22:woman:178.4 |
3 | 小黑 | 18:man:192.9 |
4 | 小兰 | 19:woman:188.0 |
5 | 小爱 | 30:woman:177.3 |