上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Flink 将数据写入MySQL(JDBC)

guduadmin112小时前

一、写在前面

在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。

二、代码示例

2.1 版本说明

        1.14.6
        2.4.3
        2.8.5
        1.4.9
        2.3.5
        1.8
        2.11.8
        8.0.22
        2.11

2.2 导入相关依赖

 
    org.apache.flink
    flink-connector-jdbc_2.11
    ${flink.version}



   mysql
   mysql-connector-java
   8.0.22

2.3 连接数据库,创建表

mysql> CREATE TABLE `ws` ( 
      `id` varchar(100) NOT NULL
      ,`ts` bigint(20) DEFAULT NULL
      ,`vc` int(11) DEFAULT NULL, PRIMARY KEY (`id`) 
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8

2.4 创建POJO类

package com.flink.POJOs;
import java.util.Objects;
/**
 * TODO POJO类的特点
 * 类是公有(public)的
 * 有一个无参的构造方法
 * 所有属性都是公有(public)的
 * 所有属性的类型都是可以序列化的
 */
public class WaterSensor {
    //类的公共属性
    public String id;
    public Long ts;
    public Integer vc;
    //无参构造方法
    public WaterSensor() {
        //System.out.println("调用了无参数的构造方法");
    }
    public WaterSensor(String id, Long ts, Integer vc) {
        this.id = id;
        this.ts = ts;
        this.vc = vc;
    }
    //生成get和set方法
    public void setId(String id) {
        this.id = id;
    }
    public void setTs(Long ts) {
        this.ts = ts;
    }
    public void setVc(Integer vc) {
        this.vc = vc;
    }
    public String getId() {
        return id;
    }
    public Long getTs() {
        return ts;
    }
    public Integer getVc() {
        return vc;
    }
    //重写toString方法
    @Override
    public String toString() {
        return "WaterSensor{" +
                "id='" + id + '\'' +
                ", ts=" + ts +
                ", vc=" + vc +
                '}';
    }
    //重写equals和hasCode方法
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        WaterSensor that = (WaterSensor) o;
        return id.equals(that.id) && ts.equals(that.ts) && vc.equals(that.vc);
    }
    @Override
    public int hashCode() {
        return Objects.hash(id, ts, vc);
    }
}
//scala的case类?

2.5 自定义map函数

package com.flink.POJOs;
import org.apache.flink.api.common.functions.MapFunction;
public class WaterSensorMapFunction implements MapFunction {
    @Override
    public WaterSensor map(String value) throws Exception {
        String[] datas = value.split(",");
        return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
    }
}

2.5 Flink2MySQL

package com.flink.DataStream.Sink;
import com.flink.POJOs.WaterSensor;
import com.flink.POJOs.WaterSensorMapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.sql.PreparedStatement;
import java.sql.SQLException;
/**
 * Flink 输出到 MySQL(JDBC)
 */
public class flinkSinkJdbc {
    public static void main(String[] args) throws Exception {
        //TODO 创建Flink上下文执行环境
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecutionEnvironment.setParallelism(1);
        //TODO Source
        DataStreamSource dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);
        //TODO Transfer
        SingleOutputStreamOperator waterSensorSingleOutputStreamOperator = dataStreamSource.map(new WaterSensorMapFunction());
        /**TODO 写入 mysql
         * 1、只能用老的 sink 写法
         * 2、JDBCSink 的 4 个参数:
         *   第一个参数: 执行的 sql,一般就是 insert into
         *   第二个参数: 预编译 sql, 对占位符填充值
         *   第三个参数: 执行选项 ---->攒批、重试
         *   第四个参数: 连接选项---->url、用户名、密码
         */
        SinkFunction sinkFunction = JdbcSink.sink("insert into ws values(?,?,?)",
                new JdbcStatementBuilder() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                        preparedStatement.setString(1, waterSensor.getId());
                        preparedStatement.setLong(2, waterSensor.getTs());
                        preparedStatement.setInt(3, waterSensor.getVc());
                        System.out.println("数据写入成功:"+'('+waterSensor.getId()+","+waterSensor.getTs()+","+waterSensor.getVc()+")");
                    }
                }
                , JdbcExecutionOptions
                        .builder()
                        .withMaxRetries(3)         // 重试次数
                        .withBatchSize(100)        // 批次的大小:条数
                        .withBatchIntervalMs(3000) // 批次的时间
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://localhost:3306/dw?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                        .withUsername("root")
                        .withPassword("********")
                        .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                        .build()
        );
        //TODO 写入到Mysql
        waterSensorSingleOutputStreamOperator.addSink(sinkFunction);
        streamExecutionEnvironment.execute();
    }
}

2.6 启动necat、Flink,观察数据库写入情况

nc -lk 9999 #启动necat、并监听8888端口,写入数据

Flink 将数据写入MySQL(JDBC),在这里插入图片描述,第1张

启动Flink程序

Flink 将数据写入MySQL(JDBC),在这里插入图片描述,第2张

查看数据库写入是否正常

Flink 将数据写入MySQL(JDBC),在这里插入图片描述,第3张

网友评论

搜索
最新文章
热门文章
热门标签