一、写在前面
在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。
二、代码示例
2.1 版本说明
1.14.6 2.4.3 2.8.5 1.4.9 2.3.5 1.8 2.11.8 8.0.22 2.11
2.2 导入相关依赖
org.apache.flink flink-connector-jdbc_2.11 ${flink.version} mysql mysql-connector-java 8.0.22
2.3 连接数据库,创建表
mysql> CREATE TABLE `ws` ( `id` varchar(100) NOT NULL ,`ts` bigint(20) DEFAULT NULL ,`vc` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
2.4 创建POJO类
package com.flink.POJOs; import java.util.Objects; /** * TODO POJO类的特点 * 类是公有(public)的 * 有一个无参的构造方法 * 所有属性都是公有(public)的 * 所有属性的类型都是可以序列化的 */ public class WaterSensor { //类的公共属性 public String id; public Long ts; public Integer vc; //无参构造方法 public WaterSensor() { //System.out.println("调用了无参数的构造方法"); } public WaterSensor(String id, Long ts, Integer vc) { this.id = id; this.ts = ts; this.vc = vc; } //生成get和set方法 public void setId(String id) { this.id = id; } public void setTs(Long ts) { this.ts = ts; } public void setVc(Integer vc) { this.vc = vc; } public String getId() { return id; } public Long getTs() { return ts; } public Integer getVc() { return vc; } //重写toString方法 @Override public String toString() { return "WaterSensor{" + "id='" + id + '\'' + ", ts=" + ts + ", vc=" + vc + '}'; } //重写equals和hasCode方法 @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; WaterSensor that = (WaterSensor) o; return id.equals(that.id) && ts.equals(that.ts) && vc.equals(that.vc); } @Override public int hashCode() { return Objects.hash(id, ts, vc); } } //scala的case类?
2.5 自定义map函数
package com.flink.POJOs; import org.apache.flink.api.common.functions.MapFunction; public class WaterSensorMapFunction implements MapFunction{ @Override public WaterSensor map(String value) throws Exception { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); } }
2.5 Flink2MySQL
package com.flink.DataStream.Sink; import com.flink.POJOs.WaterSensor; import com.flink.POJOs.WaterSensorMapFunction; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import java.sql.PreparedStatement; import java.sql.SQLException; /** * Flink 输出到 MySQL(JDBC) */ public class flinkSinkJdbc { public static void main(String[] args) throws Exception { //TODO 创建Flink上下文执行环境 StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); streamExecutionEnvironment.setParallelism(1); //TODO Source DataStreamSourcedataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888); //TODO Transfer SingleOutputStreamOperator waterSensorSingleOutputStreamOperator = dataStreamSource.map(new WaterSensorMapFunction()); /**TODO 写入 mysql * 1、只能用老的 sink 写法 * 2、JDBCSink 的 4 个参数: * 第一个参数: 执行的 sql,一般就是 insert into * 第二个参数: 预编译 sql, 对占位符填充值 * 第三个参数: 执行选项 ---->攒批、重试 * 第四个参数: 连接选项---->url、用户名、密码 */ SinkFunction sinkFunction = JdbcSink.sink("insert into ws values(?,?,?)", new JdbcStatementBuilder () { @Override public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException { preparedStatement.setString(1, waterSensor.getId()); preparedStatement.setLong(2, waterSensor.getTs()); preparedStatement.setInt(3, waterSensor.getVc()); System.out.println("数据写入成功:"+'('+waterSensor.getId()+","+waterSensor.getTs()+","+waterSensor.getVc()+")"); } } , JdbcExecutionOptions .builder() .withMaxRetries(3) // 重试次数 .withBatchSize(100) // 批次的大小:条数 .withBatchIntervalMs(3000) // 批次的时间 .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://localhost:3306/dw?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8") .withUsername("root") .withPassword("********") .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间 .build() ); //TODO 写入到Mysql waterSensorSingleOutputStreamOperator.addSink(sinkFunction); streamExecutionEnvironment.execute(); } }
2.6 启动necat、Flink,观察数据库写入情况
nc -lk 9999 #启动necat、并监听8888端口,写入数据
启动Flink程序
查看数据库写入是否正常
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章