上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

flink Mysql CDC(动态加表)、postgresqlCDC 和 CDC无锁算法

guduadmin118小时前

flinkCDC - 功能验证记录

  • flink 与cdc 版本使用搭配:
  • flink cdc
    • 参数说明
    • 原理分析
    • (DBLog)无锁算法论文
    • mysql cdc
      • cdc api 动态加表
      • flink cdc sql 性能压测
      • flink cdc api 性能压测
      • PostgreSqlCDC
        • 执行更新语句,会出现 2 种情况
        • cdc sink to kafka
        • 报错
        • mysql时区错误,The server time zone value 'EDT' is unrecognized or represents
        • java.lang.NoClassDefFoundError: io/debezium/connector/mysql/MySqlConnectorConfig
        • Cannot discover a connector using option: 'connector'='mysql-cdc'
        • Could not instantiate the executor. Make sure a planner module is on the classpath
        • (source 算子 )The TaskExecutor is shutting down.

          flink 与cdc 版本使用搭配:

          flink1.13.6 + flink mysql cdc 1.4.0

          flink 1.16.0 + flink mysql cdc 2.3.0

          flink 1.16.0 + flink mysql cdc 2.4.0

          flink 1.16.0 + flink postgresql cdc 2.3.0

          flink 1.13.6 + flink mysql cdc 2.3.0 : 没有报错,没有数据,估计是兼容有问题

          flink cdc

          参数说明

          1、调整chunck大小 : scan.incremental.snapshot.chunk.size

          2、设置cdc模式:scan.startup.mode【initial(默认)、latest-offset】

          3、支持chunk key 列设置,默认是第一个字段:scan.incremental.snapshot.chunk.key-column

          官网:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html

          原理分析

          1、cdc mysql 全量快照阶段split sql :SELECT * FROM cdc_db.tablename WHERE id >= ? AND NOT (id = ?) AND id <= ?;

          备注:id 是主键id

          (DBLog)无锁算法论文

          链接地址:https://arxiv.org/pdf/2010.12597.pdf , 对此算法感兴趣的可以看这位大佬的分享:https://zhuanlan.zhihu.com/p/600303844

          论文部分摘要理解:

          • 全量阶段:

            1、flink cdc 任务启动后按设置的chunk size切分数据,sql如下:

            (sql:SELECT * FROM cdc_db.tablename WHERE id >= ? AND NOT (id = ?) AND id <= ?; )

            2、同时会启动读取binlog任务,读取chunk对应的binlog,通过binlog对 select chunk的数据做合并操作,此操作是合并在期间执行了update、delete操作,保证insert-only

          • 增量阶段:

            1、不断追加数据

            flink Mysql CDC(动态加表)、postgresqlCDC 和 CDC无锁算法,在这里插入图片描述,第1张

            flink Mysql CDC(动态加表)、postgresqlCDC 和 CDC无锁算法,在这里插入图片描述,第2张

            mysql cdc

            cdc api 动态加表

            1、启动任务,复制checkpoint路径

            flink Mysql CDC(动态加表)、postgresqlCDC 和 CDC无锁算法,在这里插入图片描述,第3张2、新增监听的表到tableList(可以使用同一个jar包,在外部传参动态加表)

            3、从checkpoint初重启任务即可

            flink cdc sql 性能压测

            1、cdc mysql sink to kafka :一个takmanager , 4个slot , source 并发度4,sink kafka 并发度1 ,最高写入2.8W条/s

            flink cdc api 性能压测

            1、cdc mysql sink to kafka :一个takmanager , 4个slot , source 并发度4,sink kafka 并发度1 ,最高写入2.8W条/s

            PostgreSqlCDC

            执行更新语句,会出现 2 种情况

            1、若更新字段包含(部分)主键字段,会先发送一条删除之前主键的记录。op = d , after = null ; 然后再发送一条新主键记录,op = c,且before = null 。

            2、若仅更新非主键主键,只会发送一条记录,op = u , before = null。

            主体代码如下:

                    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                    // 必须开启 checkpoint ,因为Flink Postgres CDC 只会在 checkpoint 完成的时候更新 Postgres slot 中的 LSN,否则磁盘使用率会一直很高
                    env.enableCheckpointing(1000);
                    //监听 postgresql wal 日志
                    DebeziumSourceFunction sourceFunction = PostgreSQLSource.builder()
                            .hostname(host)
                            .port(port)
                            .username(userName)
                            .password(passWord)
                            .database(dbName)
                            .tableList(tableList)
                            .deserializer(new JsonDebeziumDeserializationSchema())
                            .slotName(slotName)
                            .build();
                    DataStreamSource dataStreamSource = env.addSource(sourceFunction);
                    dataStreamSource.print(">>>").setParallelism(1);
                    env.execute();
            

            cdc sink to kafka

            AT_LEAST_ONCE 模型要配置 acks = 1

            报错

            mysql时区错误,The server time zone value ‘EDT’ is unrecognized or represents

            登录mysql并查询当前时区:show variables like “%time_zone%”;

            执行以下命令修改时区:

            set global time_zone = '+8:00'; ##修改mysql全局时区为北京时间,即我们所在的东8区
            set time_zone = '+8:00'; ##修改当前会话时区
            flush privileges; #立即生效
            

            java.lang.NoClassDefFoundError: io/debezium/connector/mysql/MySqlConnectorConfig

            缺包,引入 debezium-connector-mysql-1.6.4.Final.jar包会报Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig,看 flink cdc 社区群反馈可能是

            Cannot discover a connector using option: ‘connector’=‘mysql-cdc’

            删除pom.xml文件flink-connector-mysql-cdc依赖报下的provided

            Could not instantiate the executor. Make sure a planner module is on the classpath

            包冲突原因。注释或删除jar:flink-table-planner-loader-1.16.0.jar

            (source 算子 )The TaskExecutor is shutting down.

            加大心跳间隔时间,默认是30s,‘heartbeat.interval’ = ‘60s’

网友评论

搜索
最新文章
热门文章
热门标签