背景信息
Canal是一个CDC(ChangeLog Data Capture,变更日志数据捕获)工具,可以实时地将MySQL变更传输到其他系统。Canal为变更日志提供了统一的数据格式,并支持使用JSON或protobuf序列化消息(Canal默认使用protobuf)。支持Canal格式的连接器有消息队列Kafka和对象存储OSS。
Flink支持将Canal的JSON消息解析为INSERT、UPDATE或DELETE消息到Flink SQL系统中。在很多情况下,利用Canal这个特性非常的有用,例如:
-
将增量数据从数据库同步到其他系统
-
日志审计
-
数据库的实时物化视图
-
数据库表的temporal join变更历史
Flink还支持将Flink SQL中的INSERT、UPDATE或DELETE消息编码为Canal格式的JSON消息,输出到Kafka等存储中。
重要
目前Flink还不支持将UPDATE_BEFORE和UPDATE_AFTER合并为一条UPDATE消息。因此,Flink将UPDATE_BEFORE和UPDATE_AFTER分别编码为DELETE和INSERT类型的Canal消息。
将Kafka topic注册成Flink表之后,您可以将Canal消息用作变更日志源。
-- 关于MySQL "products" 表的实时物化视图。 -- 计算相同产品的最新平均重量。 SELECT name, AVG(weight) FROM topic_products GROUP BY name; -- 将MySQL "products" 表的所有数据和增量更改同步到Elasticsearch "products" 索引以供将来搜索。 INSERT INTO elasticsearch_products SELECT * FROM topic_products;
配置选项
选项
要求
默认
类型
描述
format
必填
(none)
String
指定要使用的格式,使用Canal格式时,参数取值为canal-json。
canal-json.ignore-parse-errors
选填
false
Boolean
参数取值如下:
-
true:当解析异常时,跳过当前字段或行。
-
false(默认值):报出错误,作业启动失败。
canal-json.timestamp-format.standard
选填
SQL
String
指定输入和输出时间戳格式。参数取值如下:
-
SQL:解析yyyy-MM-dd HH:mm:ss.s{precision}格式的输入时间戳,例如2020-12-30 12:13:14.123,并以相同格式输出时间戳。
-
ISO-8601:解析yyyy-MM-ddTHH:mm:ss.s{precision}格式的输入时间戳,例如2020-12-30T12:13:14.123,并以相同的格式输出时间戳。
canal-json.map-null-key.mode
选填
FAIL
String
指定处理Map中key值为空的方法。参数取值如下:
-
FAIL:在Map中key值为空的时候抛出异常。
-
DROP:丢弃Map中key值为空的数据项。
-
LITERAL:使用字符串常量来替换Map中的空key值。字符串常量的值由canal-json.map-null-key.literal定义。
canal-json.map-null-key.literal
选填
null
String
当canal-json.map-null-key.mode的值是LITERAL时,指定字符串常量替换Map中的空key值。
canal-json.encode.decimal-as-plain-number
选填
false
Boolean
参数取值如下:
-
true:所有DECIMAL类型的数据保持原状,不使用科学计数法表示,例如0.000000027表示为0.000000027。
-
false:所有DECIMAL类型的数据,使用科学计数法表示,例如0.000000027表示为2.7E-8。
canal-json.database.include
选填
(none)
String
一个可选的正则表达式,通过正则匹配Canal记录中的database元字段,仅读取指定数据库的changelog记录。正则字符串与Java的Pattern兼容。
canal-json.table.include
选填
(none)
String
一个可选的正则表达式,通过正则匹配Canal记录中的table元字段,仅读取指定表的changelog记录。正则字符串与Java的Pattern兼容。
类型映射
目前,Canal使用JSON格式进行序列化和反序列化。有关数据类型映射的更多详细信息,请参阅JSON Format。Canal格式额外兼容了数据传输服务DTS在Kafka集群存储使用的Canal扩展变更类型(INIT)。请参见Kafka集群的数据存储格式。
其他使用说明
可用的元数据
下面的格式元数据可以在DDL语句中声明为只读(VIRTUAL)列。
重要
格式元数据字段只有在相应的连接器转发格式元数据时才可用。目前,只有Kafka连接器能够声明其值格式的元数据字段。
键
数据类型
说明
database
STRING NULL
原始数据库。对应于Canal记录中的database字段。
table
STRING NULL
原始数据库的表。对应于Canal记录中的table字段。
sql-type
MAP
NULL 各种sql类型的映射。对应于Canal记录中的sqlType字段。
pk-names
ARRAY
NULL 主键名称数组。对应于Canal记录中的pkNames字段。
ingestion-timestamp
TIMESTAMP_LTZ(3) NULL
连接器处理事件时的时间戳。对应于Canal记录中的ts字段。
如何在Kafka中访问Canal元数据字段的代码示例如下。
CREATE TABLE KafkaTable ( origin_database STRING METADATA FROM 'value.database' VIRTUAL, origin_table STRING METADATA FROM 'value.table' VIRTUAL, origin_sql_type MAP
METADATA FROM 'value.sql-type' VIRTUAL, origin_pk_names ARRAY METADATA FROM 'value.pk-names' VIRTUAL, origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL, user_id BIGINT, item_id BIGINT, behavior STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'canal-json' ); 常见问题
故障时投递重复的变更事件
在正常的操作环境下,Canal能够以exactly-once的语义投递每条变更事件,Flink能够正常消费Canal产生的变更事件。在非正常情况下(例如有故障发生),Canal只能保证at-least-once的投递语义。此时,Canal可能会投递重复的变更事件到Kafka中,当Flink从Kafka中消费的时候就会得到重复的事件,可能导致Flink query的运行得到错误的结果或者非预期的异常。因此,在这种情况下,建议将作业参数table.exec.source.cdc-events-duplicate设置成true,并在该source上定义PRIMARY KEY。Flink系统会生成一个额外的有状态算子,使用该PRIMARY KEY来对变更事件去重并生成一个规范化的changelog流。
参考:Canal格式的使用方法和类型映射_实时计算 Flink版(Flink)-阿里云帮助中心
-
猜你喜欢
- 13小时前部署YUM仓库及NFS共享存储
- 13小时前【计算机毕设选题】基于大数据的股票量化分析与股价预测系统
- 13小时前项目分享:基于大数据的股票数据分析系统设计与实现
- 13小时前数据湖架构Hudi(二)Hudi版本0.12源码编译、Hudi集成spark、使用IDEA与spark对hudi表增删改查
- 13小时前Java 栈和队列的交互实现
- 13小时前3D Gaussian Splatting:用于实时的辐射场渲染
- 11小时前准备好了吗英文(准备好了吗英文咋说)
- 8小时前辣椒种子催芽方法(辣椒种子催芽方法图解)
- 4小时前xp3用什么模拟器打开(xp模拟器怎么用)
- 2小时前微信怎么发说说(微信怎么发说说只含文字)
网友评论
- 搜索
- 最新文章
- 热门文章