Flink——实战之MySQL Sink
主要内容:实现Flink写数据到MySQL,即MySQL Sink。
准备工作
Flink自身并没有提供连接MySQL的连接器,需要手动引入:
<!-- mysql connector -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
代码实现
核心代码在于:
streaming.addSink(new MyJdbcSink).setParallelism(1)
这里你需要1个参数:
MyJdbcSink
:自定义的JdbcSink。需要注意的是,实现该方法时要继承RichSinkFunction
函数,可利用open()
函数初始化JDBC连接、SQL预编译器等运行时环境,也可以利用close()
函数做清理工作。若选择继承SinkFunction
,会在每次写入一条数据时都会创建一个JDBC连接。源码注解中给出的解释:
Writes the given value to the sink. This function is called for every record.
完整代码如下:
package org.ourhome.streamapi
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
/**
* @Author Do
* @Date 2020/4/18 22:23
*/
object MysqlSinkTest {
private val URL: String = "jdbc:mysql://ip:port/database?characterEncoding=utf8&useSSL=false"
private val USER: String = "root"
private val PASSWORD: String = "123456"
def main(args: Array[String]): Unit = {
val params: ParameterTool = ParameterTool.fromArgs(args)
val runType:String = params.get("runtype")
println("runType: " + runType)
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.enableCheckpointing(5000) // checkpoint every 5000
env.setStateBackend(new FsStateBackend("file:///D:/Work/Code/flinkdev/src/main/resources/checkpoint"))
val inputStream: DataStream[String] = env.socketTextStream("host", 9000)
// 处理inputStream,包装成Person类
val streaming: DataStream[Person] = inputStream.map(line => {
println(line)
val strings: Array[String] = line.split(",")
Person(strings(0).trim, strings(1).trim.toInt, strings(2).trim, strings(3).trim.toFloat)
})
streaming.addSink(new MyJdbcSink).setParallelism(1)
env.execute("Mysql Sink")
}
case class Person (name: String, age: Int, gender: String, height: Float)
/**
* 若选择SinkFunction "Writes the given value to the sink. This function is called for every record."
* RichFunction有生命周期和初始化配置功能,在初始化时创建连接,后面直接调用连接
*/
class MyJdbcSink extends RichSinkFunction[Person] {
// 定义一些变量:JDBC连接、sql预编译器()
var conn: Connection = _
var updateStmt: PreparedStatement = _
var insertStmt: PreparedStatement = _
// open函数用于初始化富函数运行时的上下文等环境,如JDBC连接
override def open(parameters: Configuration): Unit = {
println("----------------------------open函数初始化JDBC连接及预编译sql-------------------------")
super.open(parameters)
conn = DriverManager.getConnection(URL, USER, PASSWORD)
insertStmt = conn.prepareStatement("INSERT INTO person_message (name, age, gender, height) VALUES (?, ?, ?, ?)")
updateStmt = conn.prepareStatement("UPDATE person_message set age = ?, gender = ?, height = ? where name = ?")
}
// 调JDBC连接,执行SQL
override def invoke(value: Person, context: SinkFunction.Context[_]): Unit = {
println("-------------------------执行sql---------------------------")
// 执行更新语句
updateStmt.setInt(1, value.age)
updateStmt.setString(2, value.gender)
updateStmt.setDouble(3, value.height)
updateStmt.setString(4, value.name)
updateStmt.execute()
// 如果update没有查到数据,那么执行insert语句
if (updateStmt.getUpdateCount == 0) {
insertStmt.setString(1, value.name)
insertStmt.setInt(2, value.age)
insertStmt.setString(3, value.gender)
insertStmt.setDouble(4, value.height)
insertStmt.execute()
}
}
// 关闭时做清理工作
override def close(): Unit = {
println("-----------------------关闭连接,并释放资源-----------------------")
updateStmt.close()
insertStmt.close()
conn.close()
}
}
}
Socket发送数据:
nc -lk 9999
小明,20,man,180.2
小红,22,woman,178.4
小黑,18,man,192.9
小兰,19,woman,188.0
小爱,30,woman,177.3
输出:
--------------open函数初始化JDBC连接及预编译sql---------------
小明,20,man,180.2
-------------------------执行sql---------------------------
小红,22,woman,178.4
-------------------------执行sql---------------------------
小黑,18,man,192.9
-------------------------执行sql---------------------------
小兰,19,woman,188.0
-------------------------执行sql---------------------------
小爱,30,woman,177.3
-------------------------执行sql---------------------------
--------------------关闭连接,并释放资源----------------------
可见,程序会先调用open()
函数,创建JDBC连接及预编译SQL,然后使用该链接多次执行SQL语句,最终调用close()
函数关闭连接释放资源。注意:代码里需要将并行度设置为了1,以便观察运行机制。
MySQL表数据:
name | age | gender | height |
---|---|---|---|
小明 | 20 | man | 180.2 |
小红 | 22 | woman | 178.4 |
小黑 | 18 | man | 192.9 |
小兰 | 19 | woman | 188.0 |
小爱 | 30 | woman | 177.3 |
注意:需要提前创建表person_message。