上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

一文弄懂Flink CDC

guduadmin21天前

文章目录

      • 1.CDC概述
      • 2.CDC 的实现原理
      • 3.为什么选 Flink
      • 4.支持的连接器
      • 5.支持的 Flink 版本
      • 6.Flink CDC特性
      • 7.用法实例
        • 7.1DataStream API 的用法(推荐)
        • 7.2Table/SQL API的用法

          1.CDC概述

          CDC(Change Data Capture)是一种用于捕获和处理数据源中的变化的技术。它允许实时地监视数据库或数据流中发生的数据变动,并将这些变动抽取出来,以便进行进一步的处理和分析。

          传统上,数据源的变化通常通过周期性地轮询整个数据集进行检查来实现。但是,这种轮询的方式效率低下且不能实时反应变化。而 CDC 技术则通过在数据源上设置一种机制,使得变化的数据可以被实时捕获并传递给下游处理系统,从而实现了实时的数据变动监控。

          Flink 作为一个强大的流式计算引擎,提供了内置的 CDC 功能,能够连接到各种数据源(如数据库、消息队列等),捕获其中的数据变化,并进行灵活的实时处理和分析。

          通过使用 Flink CDC,我们可以轻松地构建实时数据管道,对数据变动进行实时响应和处理,为实时分析、实时报表和实时决策等场景提供强大的支持。

          一文弄懂Flink CDC,Flink_CDC,第1张

          2.CDC 的实现原理

          通常来讲,CDC 分为主动查询和事件接收两种技术实现模式。对于主动查询而言,用户通常会在数据源表的某个字段中,保存上次更新的时间戳或版本号等信息,然后下游通过不断的查询和与上次的记录做对比,来确定数据是否有变动,是否需要同步。这种方式优点是不涉及数据库底层特性,实现比较通用;缺点是要对业务表做改造,且实时性不高,不能确保跟踪到所有的变更记录,且持续的频繁查询对数据库的压力较大。事件接收模式可以通过触发器(Trigger)或者日志(例如 Transaction log、Binary log、Write-ahead log 等)来实现。当数据源表发生变动时,会通过附加在表上的触发器或者 binlog 等途径,将操作记录下来。下游可以通过数据库底层的协议,订阅并消费这些事件,然后对数据库变动记录做重放,从而实现同步。这种方式的优点是实时性高,可以精确捕捉上游的各种变动;缺点是部署数据库的事件接收和解析器(例如 Debezium、Canal 等),有一定的学习和运维成本,对一些冷门的数据库支持不够。综合来看,事件接收模式整体在实时性、吞吐量方面占优,如果数据源是 MySQL、PostgreSQL、MongoDB 等常见的数据库实现,建议使用Debezium来实现变更数据的捕获(下图来自Debezium 官方文档如果使用的只有 MySQL,则还可以用Canal。

          一文弄懂Flink CDC,在这里插入图片描述,第2张

          3.为什么选 Flink

          从上图可以看到,Debezium 官方架构图中,是通过 Kafka Streams 直接实现的 CDC 功能。而我们这里更建议使用 Flink CDC 模块,因为 Flink 相对 Kafka Streams 而言,有如下优势:

          • 强大的流处理引擎: Flink 是一个强大的流处理引擎,具备高吞吐量、低延迟、Exactly-Once 语义等特性。它通过基于事件时间的处理模型,支持准确和有序的数据处理,适用于实时数据处理和分析场景。这使得 Flink 成为实现 CDC 的理想选择。

          • 内置的 CDC 功能: Flink 提供了内置的 CDC 功能,可以直接连接到各种数据源,捕获数据变化,并将其作为数据流进行处理。这消除了我们自行开发或集成 CDC 解决方案的需要,使得实现 CDC 变得更加简单和高效。

          • 多种数据源的支持: Flink CDC 支持与各种数据源进行集成,如关系型数据库(如MySQL、PostgreSQL)、消息队列(如Kafka、RabbitMQ)、文件系统等。这意味着无论你的数据存储在哪里,Flink 都能够轻松地捕获其中的数据变化,并进行进一步的实时处理和分析。

          • 灵活的数据处理能力: Flink 提供了灵活且强大的数据处理能力,可以通过编写自定义的转换函数、处理函数等来对 CDC 数据进行各种实时计算和分析。同时,Flink 还集成了 SQL 和 Table API,为用户提供了使用 SQL 查询语句或 Table API 进行简单查询和分析的方式。

          • 完善的生态系统: Flink 拥有活跃的社区和庞大的生态系统,这意味着你可以轻松地获取到丰富的文档、教程、示例代码和解决方案。此外,Flink 还与其他流行的开源项目(如Apache Kafka、Elasticsearch)深度集成,提供了更多的功能和灵活性。

            4.支持的连接器

            连接器数据库Driver
            mongodb-cdcMongoDB: 3.6, 4.x, 5.0MongoDB Driver: 4.3.4
            mysql-cdcMySQL: 5.6, 5.7, 8.0.x
            RDS MySQL: 5.6, 5.7, 8.0.x
            PolarDB MySQL: 5.6, 5.7, 8.0.x
            Aurora MySQL: 5.6, 5.7, 8.0.x
            MariaDB: 10.x
            PolarDB X: 2.0.1
            JDBC Driver: 8.0.28
            oceanbase-cdcOceanBase CE: 3.1.x, 4.x
            OceanBase EE: 2.x, 3.x, 4.x
            OceanBase Driver: 2.4.x
            oracle-cdcOracle: 11, 12, 19, 21Oracle Driver: 19.3.0.0
            postgres-cdcPostgreSQL: 9.6, 10, 11, 12, 13, 14JDBC Driver: 42.5.1
            sqlserver-cdcSqlserver: 2012, 2014, 2016, 2017, 2019JDBC Driver: 9.4.1.jre8
            tidb-cdcTiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0JDBC Driver: 8.0.27
            db2-cdcDb2: 11.5Db2 Driver: 11.5.0.0
            vitess-cdcVitess: 8.0.x, 9.0.xMySql JDBC Driver: 8.0.26

            5.支持的 Flink 版本

            Flink CDC版本Flink 版本_
            1.0.01.11.*
            1.1.01.11.*
            1.2.01.12.*
            1.3.01.12.*
            1.4.01.13.*
            2.0.*1.13.*
            2.1.*1.13.*
            2.2.*1.13.* , 1.14.*
            2.3.*1.13.* , 1.14.* , 1.15.* , 1.16.0
            2.4.*1.13.* , 1.14.* , 1.15.* , 1.16.* , 1.17.0

            6.Flink CDC特性

            1. 支持读取数据库快照,即使出现故障也能继续读取binlog,并进行Exactly-once处理
            2. DataStream API 的 CDC 连接器,用户可以在单个作业中使用多个数据库和表的更改,而无需部署 Debezium 和 Kafka
            3. Table/SQL API 的 CDC 连接器,用户可以使用 SQL DDL 创建 CDC 源来监视单个表上的更改

            下表显示了连接器的当前特性:

            连接器无锁读并行读一次性语义读增量快照读
            MongoDB-CDC
            mysql-cdc
            Oracle-CDC
            Postgres-CDC
            sqlserver-cdc
            Oceanbase-CDC
            TiDB-CDC
            db2-cdc
            vitess-cdc

            7.用法实例

            7.1DataStream API 的用法(推荐)

            请严格按照上面的《5.支持的 Flink 版本》搭配来使用Flink CDC

            
            	1.13.0
            	1.8
            	1.8
            
            
            	com.ververica
            	flink-connector-mysql-cdc
            	${flinkcdc.version}
            
            
            
            	org.apache.flink
            	flink-clients_2.12
            	${flink.version}
            
            
            	org.apache.flink
            	flink-java
            	${flink.version}
            
            
            	org.apache.flink
            	flink-scala_2.12
            	${flink.version}
            
            
            	org.apache.flink
            	flink-streaming-java_2.12
            	${flink.version}
            
            
            	org.apache.flink
            	flink-streaming-scala_2.12
            	${flink.version}
            
            
            	org.apache.flink
            	flink-table-common
            	${flink.version}
            
            
            	org.apache.flink
            	flink-table-planner-blink_2.12
            	${flink.version}
            
            
            	org.apache.flink
            	flink-table-api-java-bridge_2.12
            	${flink.version}
            
            

            请提前开启MySQL中的binlog,配置my.cnf文件,重启mysqld服务即可

            my.cnf

            [client]
            default_character_set=utf8
            [mysqld]
            server-id=1
            collation_server=utf8_general_ci
            character_set_server=utf8
            log-bin=mysql-bin
            binlog_format=row
            expire_logs_days=30
            

            ddl&dml.sql

            create table test_cdc
            (
                id   int          not null
                    primary key,
                name varchar(100) null,
                age  int          null
            );
            INSERT INTO flink.test_cdc (id, name, age) VALUES (1, 'Daniel', 25);
            INSERT INTO flink.test_cdc (id, name, age) VALUES (2, 'David', 38);
            INSERT INTO flink.test_cdc (id, name, age) VALUES (3, 'James', 16);
            INSERT INTO flink.test_cdc (id, name, age) VALUES (4, 'Robert', 27);
            

            FlinkDSCDC.java

            package com.daniel.util;
            import com.ververica.cdc.connectors.mysql.MySqlSource;
            import com.ververica.cdc.connectors.mysql.table.StartupOptions;
            import com.ververica.cdc.debezium.DebeziumSourceFunction;
            import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
            import org.apache.flink.streaming.api.datastream.DataStreamSource;
            import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
            /**
             * @Author Daniel
             * @Date: 2023/7/25 10:03
             * @Description DataStream API CDC
             **/
            public class FlinkDSCDC {
                public static void main(String[] args) throws Exception {
                    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                    env.setParallelism(1);
                    DebeziumSourceFunction sourceFunction = MySqlSource.builder()
                            .hostname("localhost")
                            .port(3306)
                            .username("root")
                            .password("123456")
                            .databaseList("flink")
                            // 这里一定要是db.table的形式
                            .tableList("flink.test_cdc")
                            .deserializer(new StringDebeziumDeserializationSchema())
                            .startupOptions(StartupOptions.initial())
                            .build();
                    DataStreamSource dataStreamSource = env.addSource(sourceFunction);
                    dataStreamSource.print();
                    env.execute("FlinkDSCDC");
                }
            }
            
            UPDATE flink.test_cdc t SET t.age = 24 WHERE t.id = 1;
            UPDATE flink.test_cdc t SET t.name = 'Andy' WHERE t.id = 3;
            

            打印出的日志

            SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1690272544, file=mysql-bin.000001, pos=7860, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.flink.test_cdc', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.flink.test_cdc.Key:STRUCT}, value=Struct{before=Struct{id=1,name=Daniel,age=25},after=Struct{id=1,name=Daniel,age=24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1690272544000,db=flink,table=test_cdc,server_id=1,file=mysql-bin.000001,pos=7989,row=0},op=u,ts_ms=1690272544122}, valueSchema=Schema{mysql_binlog_source.flink.test_cdc.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
            SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1690272544, file=mysql-bin.000001, pos=7860, row=1, server_id=1, event=4}} ConnectRecord{topic='mysql_binlog_source.flink.test_cdc', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.flink.test_cdc.Key:STRUCT}, value=Struct{before=Struct{id=3,name=James,age=16},after=Struct{id=3,name=Andy,age=16},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1690272544000,db=flink,table=test_cdc,server_id=1,file=mysql-bin.000001,pos=8113,row=0},op=u,ts_ms=1690272544122}, valueSchema=Schema{mysql_binlog_source.flink.test_cdc.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
            

            可以得出的结论:

            1. 日志中的数据变化操作类型(op)可以表示为 ‘u’,表示更新操作。在第一条日志中,发生了一个更新操作,对应的记录的 key 是 id=1,更新前的数据是 {id=1, name=Daniel, age=25},更新后的数据是 {id=1, name=Daniel, age=24}。在第二条日志中,也发生了一个更新操作,对应的记录的 key 是 id=3,更新前的数据是 {id=3, name=James, age=16},更新后的数据是 {id=3, name=Andy, age=16}。
            2. 每条日志还提供了其他元数据信息,如数据源(source)、版本号(version)、连接器名称(connector)、时间戳(ts_ms)等。这些信息可以帮助我们追踪记录的来源和处理过程。
            3. 日志中的 sourceOffset 包含了一些关键信息,如事务ID(transaction_id)、文件名(file)、偏移位置(pos)等。这些信息可以用于确保数据的准确顺序和一致性。
            7.2Table/SQL API的用法

            FlinkSQLCDC.java

            package com.daniel.util;
            import org.apache.flink.api.java.tuple.Tuple2;
            import org.apache.flink.streaming.api.datastream.DataStream;
            import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
            import org.apache.flink.table.api.Table;
            import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
            import org.apache.flink.types.Row;
            /**
             * @Author Daniel
             * @Date: 2023/7/25 15:25
             * @Description
             **/
            public class FlinkSQLCDC {
                public static void main(String[] args) throws Exception {
                    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                    env.setParallelism(1);
                    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
                    tableEnv.executeSql("CREATE TABLE test_cdc (" +
                            " id int primary key," +
                            " name STRING," +
                            " age int" +
                            ") WITH (" +
                            " 'connector' = 'mysql-cdc'," +
                            " 'scan.startup.mode' = 'latest-offset'," +
                            " 'hostname' = 'localhost'," +
                            " 'port' = '3306'," +
                            " 'username' = 'root'," +
                            " 'password' = '123456'," +
                            " 'database-name' = 'flink'," +
                            " 'table-name' = 'test_cdc'" +
                            ")");
                    Table table = tableEnv.sqlQuery("select * from test_cdc");
                    DataStream> dataStreamSource = tableEnv.toRetractStream(table, Row.class);
                    dataStreamSource.print();
                    env.execute("FlinkSQLCDC");
                }
            }
            
            UPDATE flink.test_cdc t SET t.age = 55 WHERE t.id = 2;
            UPDATE flink.test_cdc t SET t.age = 22 WHERE t.id = 3;
            UPDATE flink.test_cdc t SET t.name = 'Alice' WHERE t.id = 4;
            UPDATE flink.test_cdc t SET t.age = 18 WHERE t.id = 1;
            INSERT INTO flink.test_cdc (id, name, age) VALUES (5, 'David', 29);
            

            打印出的日志

            (false,-U[2, David, 38])
            (true,+U[2, David, 55])
            (false,-U[3, Andy, 16])
            (true,+U[3, Andy, 22])
            (false,-U[4, Robert, 27])
            (true,+U[4, Alice, 27])
            (false,-U[1, Daniel, 24])
            (true,+U[1, Daniel, 18])
            (true,+I[5, David, 29])
            

网友评论

搜索
最新文章
热门文章
热门标签