目录
前言:
1、springboot引入依赖:
2、yml配置文件
3、创建SQL server CDC变更数据监听器
4、反序列化数据,转为变更JSON对象
5、CDC 数据实体类
6、自定义ApplicationContextUtil
7、自定义sink 交由spring管理,处理变更数据
前言:
我的场景是从SQL Server数据库获取指定表的增量数据,查询了很多获取增量数据的方案,最终选择了Flink的 flink-connector-sqlserver-cdc ,这个需要用到SQL Server 的CDC(变更数据捕获),通过CDC来获取增量数据,处理数据前需要对数据库进行配置,如果不清楚如何配置可以看看我这篇文章:《SQL Server数据库开启CDC变更数据捕获操作指引》
废话不多说,直接上干货,如有不足还请指正
1、springboot引入依赖:
1.16.0 com.microsoft.sqlserver mssql-jdbc9.4.0.jre8 org.projectlombok lombok1.18.26 org.apache.flink flink-java${flink.version} org.apache.flink flink-streaming-java${flink.version} org.apache.flink flink-clients${flink.version} com.ververica flink-connector-sqlserver-cdc2.3.0 org.apache.flink flink-connector-kafka${flink.version} org.apache.flink flink-table-planner-blink_2.111.13.6
2、yml配置文件
spring: datasource: url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=HM_5001 username: sa password: root driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver # 实时同步SQL Server数据库配置 CDC: DataSource: host: 127.0.0.1 port: 1433 database: HM_5001 tableList: dbo.t1,dbo.Tt2,dbo.t3,dbo.t4 username: sa password: sa
3、创建SQL server CDC变更数据监听器
import com.ververica.cdc.connectors.sqlserver.SqlServerSource; import com.ververica.cdc.connectors.sqlserver.table.StartupOptions; import com.ververica.cdc.debezium.DebeziumSourceFunction; import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import java.io.Serializable; /** * SQL server CDC变更监听器 **/ @Component @Slf4j public class SQLServerCDCListener implements ApplicationRunner, Serializable { /** * CDC数据源配置 */ @Value("${CDC.DataSource.host}") private String host; @Value("${CDC.DataSource.port}") private String port; @Value("${CDC.DataSource.database}") private String database; @Value("${CDC.DataSource.tableList}") private String tableList; @Value("${CDC.DataSource.username}") private String username; @Value("${CDC.DataSource.password}") private String password; private final DataChangeSink dataChangeSink; public SQLServerCDCListener(DataChangeSink dataChangeSink) { this.dataChangeSink = dataChangeSink; } @Override public void run(ApplicationArguments args) throws Exception { log.info("开始启动Flink CDC获取ERP变更数据......"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DebeziumSourceFunctiondataChangeInfoMySqlSource = buildDataChangeSource(); DataStream streamSource = env .addSource(dataChangeInfoMySqlSource, "SQLServer-source") .setParallelism(1); streamSource.addSink(dataChangeSink); env.execute("SQLServer-stream-cdc"); } /** * 构造CDC数据源 */ private DebeziumSourceFunction buildDataChangeSource() { String[] tables = tableList.replace(" ", "").split(","); return SqlServerSource. builder() .hostname(host) .port(Integer.parseInt(port)) .database(database) // monitor sqlserver database .tableList(tables) // monitor products table .username(username) .password(password) /* *initial初始化快照,即全量导入后增量导入(检测更新数据写入) * latest:只进行增量导入(不读取历史变化) */ .startupOptions(StartupOptions.latest()) .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .build(); } }
4、反序列化数据,转为变更JSON对象
import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.List; import java.util.Optional; /** * SQLServer消息读取自定义序列化 **/ @Slf4j public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema{ public static final String TS_MS = "ts_ms"; public static final String BEFORE = "before"; public static final String AFTER = "after"; public static final String SOURCE = "source"; public static final String CREATE = "CREATE"; public static final String UPDATE = "UPDATE"; /** * * 反序列化数据,转为变更JSON对象 */ @Override public void deserialize(SourceRecord sourceRecord, Collector collector) { try { String topic = sourceRecord.topic(); String[] fields = topic.split("\\."); String database = fields[1]; String tableName = fields[2]; Struct struct = (Struct) sourceRecord.value(); final Struct source = struct.getStruct(SOURCE); DataChangeInfo dataChangeInfo = new DataChangeInfo(); dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString()); dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString()); // 获取操作类型 CREATE UPDATE DELETE 1新增 2修改 3删除 Envelope.Operation operation = Envelope.operationFor(sourceRecord); String type = operation.toString().toUpperCase(); int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3; dataChangeInfo.setEventType(eventType); dataChangeInfo.setDatabase(database); dataChangeInfo.setTableName(tableName); ZoneId zone = ZoneId.systemDefault(); Long timestamp = Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis); dataChangeInfo.setChangeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zone)); //7.输出数据 collector.collect(dataChangeInfo); } catch (Exception e) { log.error("SQLServer消息读取自定义序列化报错:{}", e.getMessage()); e.printStackTrace(); } } /** * * 从源数据获取出变更之前或之后的数据 */ private JSONObject getJsonObject(Struct value, String fieldElement) { Struct element = value.getStruct(fieldElement); JSONObject jsonObject = new JSONObject(); if (element != null) { Schema afterSchema = element.schema(); List fieldList = afterSchema.fields(); for (Field field : fieldList) { Object afterValue = element.get(field); jsonObject.put(field.name(), afterValue); } } return jsonObject; } @Override public TypeInformation getProducedType() { return TypeInformation.of(DataChangeInfo.class); } }
5、CDC 数据实体类
import lombok.Data; import java.io.Serializable; import java.time.LocalDateTime; /** * CDC 数据实体类 */ @Data public class DataChangeInfo implements Serializable { /** * 数据库名 */ private String database; /** * 表名 */ private String tableName; /** * 变更时间 */ private LocalDateTime changeTime; /** * 变更类型 1新增 2修改 3删除 */ private Integer eventType; /** * 变更前数据 */ private String beforeData; /** * 变更后数据 */ private String afterData; }
6、自定义ApplicationContextUtil
import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import java.io.Serializable; @Component public class ApplicationContextUtil implements ApplicationContextAware, Serializable { /** * 上下文 */ private static ApplicationContext context; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.context = applicationContext; } public static ApplicationContext getApplicationContext() { return context; } public staticT getBean(Class beanClass) { return context.getBean(beanClass); } }
7、自定义sink 交由spring管理,处理变更数据
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.springframework.stereotype.Component; import lombok.extern.slf4j.Slf4j; /** * 自定义sink 交由spring管理 * 处理变更数据 **/ @Component @Slf4j public class DataChangeSink extends RichSinkFunction{ private static final long serialVersionUID = -74375380912179188L; private UserMapper userMapper; /** * 在open()方法中动态注入Spring容器的类 * 在启动SpringBoot项目是加载了Spring容器,其他地方可以使用@Autowired获取Spring容器中的类; * 但是Flink启动的项目中,默认启动了多线程执行相关代码,导致在其他线程无法获取Spring容器, * 只有在Spring所在的线程才能使用@Autowired,故在Flink自定义的Sink的open()方法中初始化Spring容器 */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); userMapper = ApplicationContextUtil.getBean(UserMapper.class); } @Override public void invoke(DataChangeInfo dataChangeInfo, Context context) { log.info("收到变更原始数据:{}", dataChangeInfo); // TODO 开始处理你的数据吧 }
以上是我亲自验证测试的结果,已发布生产环境,如有不足还请指正。
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章