上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql

guduadmin11天前

环境说明:

flink 1.15.2

Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production

mysql 版本:5.7

windows11 IDEA 本地运行

先上官网使用说明和案例:Oracle CDC Connector — Flink CDC documentation

1. Oracle 开启 log archiving

(1).启用 log archiving

        a:以DBA用户连接数据库 

             sqlplus / as sysdba

        b:启用 log archiving (会重启数据库)

             alter system set db_recovery_file_dest_size = 10G;

             alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;

             shutdown immediate;

             startup mount;

             alter database archivelog;

             alter database open;

        c:检查 log archiving 是否开启  -- Should now "Database log mode: Archive Mode"

             archive log list;

    (2).注:必须为捕获的表或数据库启用补充日志记录,以便数据更改能够捕获已更改数据库行的之前状态。下面演示了如何在表/数据库级别上配置它。

        为一个特定的表启用补充日志记录:修改表目录。客户添加补充日志数据(所有)列;

            ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

        —为数据库启用补充日志修改数据库添加补充日志数据;

            ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

    (3).创建具有权限的Oracle用户

        a:创建表空间

            sqlplus / as sysdba

              CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

              exit;

          b:创建用户并赋权  flinkuser  flinkpw 

            sqlplus / as sysdba

              CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;

              GRANT CREATE SESSION TO flinkuser;

              GRANT SET CONTAINER TO flinkuser;

              GRANT SELECT ON V_$DATABASE to flinkuser;

              GRANT FLASHBACK ANY TABLE TO flinkuser;

              GRANT SELECT ANY TABLE TO flinkuser;

              GRANT SELECT_CATALOG_ROLE TO flinkuser;

              GRANT EXECUTE_CATALOG_ROLE TO flinkuser;

              GRANT SELECT ANY TRANSACTION TO flinkuser;

              GRANT LOGMINING TO flinkuser;

            

              GRANT CREATE TABLE TO flinkuser;

              GRANT LOCK ANY TABLE TO flinkuser;

              GRANT ALTER ANY TABLE TO flinkuser;

              GRANT CREATE SEQUENCE TO flinkuser;

            

              GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;

              GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;

            

              GRANT SELECT ON V_$LOG TO flinkuser;

              GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;

              GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;

              GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;

              GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;

              GRANT SELECT ON V_$LOGFILE TO flinkuser;

              GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;

              GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

              exit;

2. Oracle 建表,并配置补充日志

CREATE TABLE "USER_INFO" (    

ID NUMBER, 

USERNAME VARCHAR2(255), 

PASSWORD VARCHAR2(255), 

PRIMARY KEY (ID));

ALTER TABLE USER_INFO ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

3. Mysql 建表

CREATE TABLE user_new (

  id int(11) NOT NULL,

  username varchar(255) DEFAULT NULL,

  password varchar(255) DEFAULT NULL,

  PRIMARY KEY (id)

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

4.Maven依赖

 
        8
        8
        1.15.2
    
    
        
            org.apache.flink
            flink-clients
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-java
            ${flink.version}
        
        
            org.apache.flink
            flink-runtime-web
            ${flink.version}
        
        
            org.apache.flink
            flink-table-planner_2.12
            ${flink.version}
            
        
        
            org.apache.flink
            flink-connector-jdbc
            ${flink.version}
            
            
        
        
            mysql
            mysql-connector-java
            8.0.29
            
        
        
            org.projectlombok
            lombok
            1.18.22
        
        
        
            com.ververica
            flink-sql-connector-mysql-cdc
            2.3.0
            
        
        
            org.apache.flink
            flink-connector-jdbc
            1.15.2
            
            
        
        
            org.apache.flink
            flink-connector-base
            ${flink.version}
            
        
        
        
            com.ververica
            flink-sql-connector-oracle-cdc
            2.3.0
            
        
        
            org.apache.logging.log4j
            log4j-slf4j-impl
            2.12.1
        
        
            org.slf4j
            slf4j-simple
            1.7.15
        
        
            org.apache.logging.log4j
            log4j-core
            2.17.2
        
        
        
            org.apache.logging.log4j
            log4j-api
            2.17.2
        
        
        
            log4j
            log4j
            1.2.9
        
    

5.demo如下:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class OracleCdcToMysql {
    public static void main(String[] args) {
        //1.获取stream的执行环境
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        senv.setParallelism(1);
        //2.创建表执行环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);
        String sourceTable = "CREATE TABLE oracle_cdc_source " +
                "( ID INT, " +
                "USERNAME STRING, " +
                "PASSWORD STRING, " +
                "PRIMARY KEY(ID) NOT ENFORCED) WITH (\n" +
                "'connector' = 'oracle-cdc',\n" +
                "'hostname' = '1.1.1.1',\n" +
                "'port' = '1521',\n" +
                "'username' = 'flinkcdcuser',\n" +
                "'password' = 'flinkpw',\n" +
                "'database-name' = 'LMDB',\n" +//select name from v$database;
                "'schema-name' = 'TEST',\n" +//select SYS_CONTEXT('USERENV','CURRENT_SCHEMA') CURRENT_SCHEMA from dual;
                "'debezium.snapshot.mode' = 'schema_only',\n" +
                //snapshot.mode = initial 快照包括捕获表的结构和数据。指定此值将用捕获表中数据的完整表示填充主题。
                //snapshot.mode = schema_only 快照只包含捕获表的结构。如果希望连接器仅捕获快照之后发生的更改的数据,请指定此值。
                "'scan.incremental.snapshot.enabled' = 'true',\n" +
                //scan.incremental.snapshot.enabled 增量快照是一种读取表快照的新机制。增量快照与旧的快照机制相比有很多优点,包括:
                // (1)在快照读取期间源可以并行;(2)在快照读取期间源可以在块粒度上执行检查点;(3)在快照读取之前源不需要获取ROW SHARE MODE锁。
                "'scan.incremental.snapshot.chunk.size' = '8096' ,\n" +
                //表快照的块大小(行数),当读取表快照时,捕获的表被分割成多个块。
                "'scan.snapshot.fetch.size' = '1024',\n" +
                //读取表快照时每个轮询的最大读取大小。
                "'connect.max-retries' = '3',\n" +
                //连接器应该重试构建Oracle数据库服务器连接的最大重试次数。
                "'connection.pool.size'= '20',\n" +
                //连接池大小
                "'debezium.log.mining.strategy' = 'online_catalog',\n" +
                //online_catalog -使用数据库的当前数据字典来解析对象id,并且不向在线重做日志中写入任何额外的信息。
                // 这使得LogMiner的挖掘速度大大提高,但代价是无法跟踪DDL的变化。如果捕获的表模式很少或从不更改,那么这是理想的选择。
                "'debezium.log.mining.archive.destination.name' = 'log_archive_dest_1',\n" +
                "'debezium.log.mining.continuous.mine'='true'," +
                "  'table-name' = 'USER_INFO'\n" +
                ")";
        tEnv.executeSql(sourceTable);
//        tEnv.executeSql("select * from oracle_cdc_source").print(); //加上打印后,虽然可以实时看到增删改查记录,但是这些后续操作并不会插入到目标表。如果不加这句打印,则程序无问题
        String sinkTable = "CREATE TABLE mysql_cdc_sink (" +
                "  ID INT,\n" +
                "  USERNAME STRING,\n" +
                "  PASSWORD STRING,\n" +
                "PRIMARY KEY(ID) NOT ENFORCED\n" +
                ") WITH (\n" +
                "'connector' = 'jdbc',\n" +
                "'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                "'url' = 'jdbc:mysql://localhost:3306/test?rewriteBatchedStatements=true',\n" +
                "'username' = 'flink_cdc_user',\n" +
                "'password' = 'flink@cdc',\n"+
                "  'table-name' = 'user_new',\n" +
                "  'connection.max-retry-timeout' = '60s'\n" +
                ")";
        tEnv.executeSql(sinkTable);
        tEnv.executeSql("insert into mysql_cdc_sink select ID,USERNAME,PASSWORD from oracle_cdc_source");
    }
}

本地运行控制台是不会输出什么提示的,不像mysql cdc 还可以看到一些查看binlog日志信息。你可以知道程序运行成功与否,Oracle的什么都不会输出。

下图是有打印的,但是只能打印,后续插表动作就失效了。如果不打印,那就是什么都没有。Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql,第1张

 下图是mysqlCDC的,可以看到有连接,有读取binlog日志,并且还可以打印,后续插表也正常。

Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql,第2张

具体对应数据类型,还需查看官网,最下面有列出所有对应的数据类型。

具体可用参数,可查官网,也可查阿里介绍,毕竟这是阿里大大的。感觉阿里大大的参数类型更全,更多。具体如何使用,还需研究。MySQL_实时计算 Flink版-阿里云帮助中心

6.打包到集群运行--后续再补一篇吧,前面几篇都需要。单独补一篇。

网友评论

搜索
最新文章
热门文章
热门标签