上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

基于Flink CDC实时同步PostgreSQL与Tidb【Flink SQL Client模式下亲测可行,详细教程】

guduadmin14小时前

文章目录

  • 一、PostgreSQL作为数据来源(source),由flink读取
    • 1.postgre安装与配置
    • 2.flink安装与配置
    • 3.flink cdc postgre配置
      • 3.1 postgre配置(for flink cdc)
      • 3.2 flink cdc postgres的jar包下载
      • 4.flink cdc postgre测试
      • 二、Tidb作为数据去向(sink),由flink写入
        • 1.tidb安装与配置
        • 2.flink cdc tidb的jar包下载
        • 3.flink cdc tidb测试
        • 三、用Flink SQL Client同步PostgreSQL到Tidb

          操作系统:ubuntu-22.04,运行于wsl 2【 注意,请务必使用wsl 2;wsl 1会出现各种各样的问题】

          软件版本:PostgreSQL 14.9,TiDB v7.3.0,flink 1.7.1,flink cdc 2.4.0

          一、PostgreSQL作为数据来源(source),由flink读取

          1.postgre安装与配置

          已有postgre的跳过此步

          (1)pg安装

          https://zhuanlan.zhihu.com/p/143156636

          sudo apt install postgresql
          sudo -u postgres psql -c "SELECT version();"
          sudo -u postgres psql # 连接进入postgre shell(以管理员用户)
          

          (2)pg配置

          # 创建新用户和数据库
          sudo su - postgres -c "createuser domeya"
          sudo su - postgres -c "createdb domeya_db"
          sudo -u postgres psql # 进入psql(管理员用户postgres)
          grant all privileges on database domeya_db to domeya; # 授权用户操作数据库
          \password # 设置当前用户密码(\password domeya,可设置用户domeya的密码)
          \q # 退出psql
          # psql postgres://username:password@host:port/dbname  
          psql postgres://domeya:123@localhost:5432/domeya_db # 新用户测试连接
          

          可能出现的问题

          sudo -u postgres psql报错:

          psql: error: connection to server on socket “/var/run/postgresql/.s.PGSQL.5432” failed: No such file or directory

          Is the server running locally and accepting connections on that socket?

          https://stackoverflow.com/questions/69639250/pgconnectionbad-connection-to-server-on-socket-var-run-postgresql-s-pgsql

          https://blog.csdn.net/psiitoy/article/details/7310003

          【解决关键】:重启pg服务

          # 重启
          sudo service postgresql restart # 重要!
          ps -ef | grep postgres
          

          (可选)重装pg

          # 卸载
          dpkg --list | grep postgresql
          dpkg --purge postgresql postgresql-14 postgresql-client-14 postgresql-client-common postgresql-common # 根据dpkg --list | grep postgresql中展示的结果进行填写
          # rm -rf /var/lib/postgresql/
          # 重装
          sudo apt install postgresql
          

          2.flink安装与配置

          已有flink的跳过此步

          flink安装,配置环境变量

          # https://flink.apache.org/downloads/
          curl -O -L https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
          tar zxvf flink-1.17.1-bin-scala_2.12.tgz -C /opt
          sudo vim /etc/profile.d/flink.sh
          # flink.sh
          export FLINK_HOME=/opt/flink-1.17.1
          export PATH=$PATH:$FLINK_HOME/bin
          source /etc/profile
          

          如果webUI无法外机访问把rest.bind-address: 0.0.0.0这个设置放开权限即可

          cd $FLINK_HOME/conf
          cp flink-conf.yaml flink-conf.yaml.backup
          vim flink-conf.yaml
          # 修改以下设置
          rest.bind-address: 0.0.0.0
          

          启动flink

          cd $FLINK_HOME
          ./bin/start-cluster.sh # 启动flink
          jps # 查看是否启动StandaloneSessionClusterEntrypoint, TaskManagerRunner
          # ./bin/stop-cluster.sh # 关闭flink
          

          3.flink cdc postgre配置

          3.1 postgre配置(for flink cdc)

          https://www.cnblogs.com/xiongmozhou/p/14817641.html

          (1)修改配置文件

          cd /etc/postgresql/14/main
          cp postgresql.conf postgresql.conf.backup
          vim postgresql.conf
          

          postgresql.conf修改几个关键配置如下:

          # 更改wal日志方式为logical
          wal_level = logical # minimal, replica, or logical
          # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
          max_replication_slots = 20 # max number of replication slots
          # 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
          max_wal_senders = 20 # max number of walsender processes
          # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)
          wal_sender_timeout = 180s # in milliseconds; 0 disable  
          

          修改完之后重启postgresql,service postgresql restart

          (2)赋予权限

          以管理员进入psql,sudo -u postgres psql

          (可选)如果没有测试表,可以新建一个

          -- 如果没有测试表,可创建一个
          CREATE TABLE test_table1(
             id varchar(8),
             p_dt varchar(8)
          );
          -- 查看表
          \d
          insert into test_table1 values('1', '20230820');
          select * from test_table1;
          

          赋予普通用户复制流权限、发布表、更改表的复制标识包含更新和删除的值

          -- 给用户复制流权限
          ALTER ROLE domeya replication;
          -- 查看权限
          \du
          \c domeya_db -- 重要:进入到domeya_db数据库(以管理员账号进入)
          -- 设置发布为true
          update pg_publication set puballtables=true where pubname is not null;
          -- 把所有表进行发布(包括以后新建的表);
          -- 注意,此处PUBLICATION名字必须为dbz_publication,否则后续flink sql报错must be superuser to create FOR ALL TABLES publication
          CREATE PUBLICATION dbz_publication FOR ALL TABLES;
          -- 查询哪些表已经发布
          select * from pg_publication_tables;
          -- 更改复制标识包含更新和删除之前值
          ALTER TABLE test_table1 REPLICA IDENTITY FULL; -- 对应前面创建的测试表
          -- 查看复制标识(为f标识说明设置成功)
          select relreplident from pg_class where relname='test_table1'; -- 对应前面创建的测试表
          -- 退出
          \q
          

          wal_level = logical源表的数据修改时,默认的逻辑复制流只包含历史记录的primary key,如果需要输出更新记录的历史记录的所有字段,需要在表级别修改参数:ALTER TABLE tableName REPLICA IDENTITY FULL; 这样才能捕获到源表所有字段更新后的值

          发布所有表可能太多,也可以创建publication,添加指定表到publication。

          update pg_publication set puballtables=false where pubname is not null; -- 默认发布所有表为false
          CREATE PUBLICATION flink_cdc_publication;
          alter publication flink_cdc_publication add table test_table1;
          select * from pg_publication;
          select * from pg_publication_tables;
          

          3.2 flink cdc postgres的jar包下载

          下载flink cdc postgres相关jar包,放在$FLINK_HOME/lib

          cd $FLINK_HOME/lib
          # 以下用于flink cdc postgres连接
          # 注意:用于flink sql的jar包是flink-sql-connector-postgres-cdc,不是flink-connector-postgres-cdc
          # https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-postgres-cdc/2.4.0
          wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.4.0/flink-sql-connector-postgres-cdc-2.4.0.jar
          

          如果flink在运行状态,需要重启flink,之后再启动flink sql client

          cd $FLINK_HOME
          ./bin/stop-cluster.sh
          ./bin/start-cluster.sh
          

          4.flink cdc postgre测试

          https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html【官方文档demo】

          启动flink sql client(之前重启了flink cluster)

          cd $FLINK_HOME
          ./bin/sql-client.sh
          

          在flink sql client创建表,与pg中的表结构对应,表名字可以不同

          CREATE TABLE source_table (
              id STRING,
              p_dt STRING
          ) WITH (
              'connector' = 'postgres-cdc',
              'hostname' = 'localhost',
              'port' = '5432',
              'username' = 'domeya',
              'password' = '123',
              'database-name' = 'domeya_db',
              'schema-name' = 'public',
              'table-name' = 'test_table1',
              'slot.name' = 'flink',
              -- experimental feature: incremental snapshot (default off)
              -- 'scan.incremental.snapshot.enabled' = 'true'
              'decoding.plugin.name' = 'pgoutput' -- 必须加,否则报错could not access file "decoderbufs"
          );
          select * from source_table;
          

          可能出现的问题

          运行select * from source_table;时报错

          报错1:

          [ERROR] Could not execute SQL statement. Reason:

          org.postgresql.util.PSQLException: ERROR: could not access file “decoderbufs”: No such file or directory

          https://github.com/ververica/flink-cdc-connectors/issues/37

          table sql加:WITH('decoding.plugin.name' = 'pgoutput')【flink sql】

          dataStream加:PostgreSQLSource.builder().decodingPluginName("pgoutput").build()

          报错2:

          [ERROR] Could not execute SQL statement. Reason:

          org.postgresql.util.PSQLException: ERROR: must be superuser to create FOR ALL TABLES publication

          https://gist.github.com/alexhwoods/4c4c90d83db3c47d9303cb734135130d

          检查之前的操作(psql):

          CREATE PUBLICATION dbz_publication FOR ALL TABLES;
          select * from pg_publication_tables;
          

          报错3:

          Caused by: org.postgresql.util.PSQLException: ERROR: replication slot “flink” already exists

          https://zhuanlan.zhihu.com/p/449066277

          当上面debezium.slot.name的值超过20个,就会报错,即使之前的job已经下线,这个slot文件依旧在,此时需要执行下面语句并删除slot即可:

          psql:

          -- https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION
          SELECT slot_name, slot_type, active FROM pg_replication_slots;
          SELECT pg_drop_replication_slot('flink'); # 这个和之前flink sql中的'slot.name' = 'flink'对应
          

          注意,flink postgres-cdc只能读(作为source),不能写(作为sink)

          Flink SQL> insert into source_table values('3', '20230820');
          [ERROR] Could not execute SQL statement. Reason:
          org.apache.flink.table.api.ValidationException: Connector 'postgres-cdc' can only be used as a source. It cannot be used as a sink.
          

          二、Tidb作为数据去向(sink),由flink写入

          1.tidb安装与配置

          已有tidb的跳过此步

          https://docs.pingcap.com/zh/tidb/stable/quick-start-with-tidb

          su xxx # 切换到你的普通用户
          curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh
          source /home/xxx/.bashrc # 按上个命令输出的路径来,上面显示的是Shell profile:  /home/xxx/.bashrc
          tiup playground # 下载镜像,并启动某个版本的集群
          
          • 以这种方式执行的 playground,在结束部署测试后 TiUP 会清理掉原集群数据,重新执行该命令后会得到一个全新的集群。
          • 若希望持久化数据,可以执行 TiUP 的 --tag 参数:tiup --tag playground ...

          下载完毕,启动成功之后展示信息:

          Connect TiDB: mysql --comments --host 127.0.0.1 --port 4000 -u root

          TiDB Dashboard: http://127.0.0.1:2379/dashboard

          Grafana: http://127.0.0.1:3000

          连接tidb

          # 使用mysql client连接tidb
          sudo apt install mysql-client
          mysql --comments --host 127.0.0.1 --port 4000 -u root
          # 设置root密码 
          # https://docs.pingcap.com/zh/tidb/stable/user-account-management#%E8%AE%BE%E7%BD%AE%E5%AF%86%E7%A0%81
          # https://blog.csdn.net/qq_45675449/article/details/106866700
          SET PASSWORD FOR 'root'@'%' = '123'; # root的localhost是%,可通过 select user,host from mysql.user; 查看
          exit;
          # mysql -uroot -p无法连接,必须加上port和host,并且host不能写成localhost
          # https://blog.csdn.net/hjf161105/article/details/78850658
          mysql -uroot --port 4000 -h 127.0.0.1 -p
          

          2.flink cdc tidb的jar包下载

          下载用于jdbc mysql连接的jar包,用于flink cdc tidb连接。

          特别注意:Tidb的sink模式得用jdbc+mysql连接,不用官方提供的tidb cdc因为其不能作为sink,只能曲线救国参考这种方法了。

          cd $FLINK_HOME/lib
          # 以下用于jdbc mysql(用于flink cdc tidb连接)
          # https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc/3.1.1-1.17
          wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar
          # https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.1.0
          wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.1.0/mysql-connector-j-8.1.0.jar
          

          如果flink在运行状态,需要重启flink,之后再启动flink sql client

          cd $FLINK_HOME
          ./bin/stop-cluster.sh
          ./bin/start-cluster.sh
          

          3.flink cdc tidb测试

          基于Flink CDC实时同步数据(MySQL到MySQL)

          flink cdc tidb 官方文档demo(无法作为sink,只能作为source)

          (可选)tidb创建测试表

          # mysql -uroot --port 4000 -h 127.0.0.1 -p
          # 创建测试表
          CREATE TABLE test.test_table1(
             id varchar(8),
             p_dt varchar(8)
          );
          insert into test.test_table1 values('3', '20230819');
          

          启动flink sql client(之前重启了flink cluster)

          cd $FLINK_HOME
          ./bin/sql-client.sh
          

          flink sql连接tidb,仿照mysql的连接

          -- checkpoint every 3000 milliseconds                       
          SET 'execution.checkpointing.interval' = '3s';
          -- register a TiDB table in Flink SQL
          CREATE TABLE sink_table (
              id STRING,
              p_dt STRING,
              PRIMARY KEY(id) NOT ENFORCED 
              -- 必须写PRIMARY KEY,否则报错:[ERROR] Could not execute SQL statement. Reason: java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.
           ) WITH (
              'connector' = 'jdbc',
              'url' = 'jdbc:mysql://localhost:4000/test',
              'driver' = 'com.mysql.cj.jdbc.Driver',
              'username' = 'root',
              'password' = '123',
              'table-name' = 'test_table1'
          );
            
          -- read snapshot and binlogs from table
          SELECT * FROM sink_table;
          

          三、用Flink SQL Client同步PostgreSQL到Tidb

          # 将会提交一个作业,进行source_table->sink_table的单向同步
          insert into sink_table select * from source_table;
          

          [INFO] Submitting SQL update statement to the cluster…

          [INFO] SQL update statement has been successfully submitted to the cluster:

          Job ID: 8e47bfa3ea78da4c47b395f7517c2812

          在flink web ui上可以看到作业运行状态。

          只要这个作业是正常runnning,那么对source_table的任何修改都会同步到sink_table。注意这种是单向同步,source_table的变动(增/删/改)会同步到sink_table,但反过来sink_table的变动不会影响到source_table(不会触发source_table->sink_table的同步)。

网友评论

搜索
最新文章
热门文章
热门标签