上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

基于Flink SQL CDC Mysql to Mysql数据同步

guduadmin13小时前

基于Flink SQL CDC Mysql to Mysql数据同步

Flink CDC有两种方式同步数据库:

  • 一种是通过FlinkSQL直接输入两表数据库映射进行数据同步,缺点是只能单表进行同步;
  • 一种是通过DataStream开发一个maven项目,打成jar包上传到服务器运行。

    本方案使用FlinkSQL方法,同步两表中的数据。

    其中Flink应用可以部署在具有公网IP的服务器上,同时可以连接其他局域网中服务器的数据进行同步工作,如不需要操作管理页面,则不对服务器IP有要求。

    一、 服务器部署Flink

    将Flink压缩包解压到服务器指定位置

    Flink下载地址,点击进入

    基于Flink SQL CDC Mysql to Mysql数据同步,在这里插入图片描述,第1张

    二、 配置Flink

    1.配置端口号

    进入到 根目录\conf\flink-conf.yaml文件,找到rest.port,将其配置为已开放的端口

    基于Flink SQL CDC Mysql to Mysql数据同步,在这里插入图片描述,第2张

    2.配置rest.bind地址

    找到rest.port位置,将其参数localhost改为0.0.0.0,不修改会导致管理页面无法正常打开。

    基于Flink SQL CDC Mysql to Mysql数据同步,在这里插入图片描述,第3张

    三、 上传两个jar包到lib中

    上面的是同步到目标mysql库的连接器

    下面的是源mysql的连接器

    下载地址:

    flink-connector-jdbc:jar:3.0.0-1.16

    flink-sql-connector-mysql-cdc:jar:2.3.0

    基于Flink SQL CDC Mysql to Mysql数据同步,在这里插入图片描述,第4张

    四、 启动服务

    进入到bin文件夹中

    1. 运行start-cluster.sh

    2. 运行sql-client.sh进入到FlinkSQL模式

    创建两个表映射、一个启动任务指令
    

    1)源数据库映射

    create table ny_energy_data_source
    (
        id             bigint ,
        enterprise_id  bigint     ,
        use_time       timestamp   ,
        date_type      int        ,
        attribute_id   bigint     ,
        PRIMARY KEY (`id`) NOT ENFORCED
    )WITH (
        'connector' = 'mysql-cdc',
        'hostname' = 'ip地址',
        'port' = '3306',
        'username' = '用户名',
        'password' = '密码',
        'database-name' = '源数据库名',
        'table-name' = '表名'
    );
    

    2)目标数据库映射

    	create table ny_energy_data_target
    (
        id             bigint ,
        enterprise_id  bigint     ,
        use_time       timestamp   ,
        date_type      int        ,
        attribute_id   bigint     ,
        PRIMARY KEY (`id`) NOT ENFORCED
    )WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://IP地址:3306/库名?serverTimezone=UTC',
        'username' = '用户名',
        'password' = '密码',
        'table-name' = '表名',
        'driver' = 'com.mysql.cj.jdbc.Driver',
        'scan.fetch-size' = '200'
    );
    

    3)启动任务

    从ny_energy_data_source到ny_energy_data_target,先全量后增量

    insert into ny_energy_data_target select * from ny_energy_data_source;
    

    五、 进入到http://IP:8081(默认端口),查看任务运行情况

    基于Flink SQL CDC Mysql to Mysql数据同步,在这里插入图片描述,第5张

网友评论

搜索
最新文章
热门文章
热门标签