目录
前言:
1、springboot引入依赖:
2、yml配置文件
3、创建SQL server CDC变更数据监听器
4、反序列化数据,转为变更JSON对象
5、CDC 数据实体类
6、自定义ApplicationContextUtil
7、自定义sink 交由spring管理,处理变更数据
前言:
我的场景是从SQL Server数据库获取指定表的增量数据,查询了很多获取增量数据的方案,最终选择了Flink的 flink-connector-sqlserver-cdc ,这个需要用到SQL Server 的CDC(变更数据捕获),通过CDC来获取增量数据,处理数据前需要对数据库进行配置,如果不清楚如何配置可以看看我这篇文章:《SQL Server数据库开启CDC变更数据捕获操作指引》
废话不多说,直接上干货,如有不足还请指正
1、springboot引入依赖:
1.16.0 com.microsoft.sqlserver mssql-jdbc9.4.0.jre8 org.projectlombok lombok1.18.26 org.apache.flink flink-java${flink.version} org.apache.flink flink-streaming-java${flink.version} org.apache.flink flink-clients${flink.version} com.ververica flink-connector-sqlserver-cdc2.3.0 org.apache.flink flink-connector-kafka${flink.version} org.apache.flink flink-table-planner-blink_2.111.13.6
2、yml配置文件
spring:
datasource:
url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=HM_5001
username: sa
password: root
driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
# 实时同步SQL Server数据库配置
CDC:
DataSource:
host: 127.0.0.1
port: 1433
database: HM_5001
tableList: dbo.t1,dbo.Tt2,dbo.t3,dbo.t4
username: sa
password: sa
3、创建SQL server CDC变更数据监听器
import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.io.Serializable;
/**
* SQL server CDC变更监听器
**/
@Component
@Slf4j
public class SQLServerCDCListener implements ApplicationRunner, Serializable {
/**
* CDC数据源配置
*/
@Value("${CDC.DataSource.host}")
private String host;
@Value("${CDC.DataSource.port}")
private String port;
@Value("${CDC.DataSource.database}")
private String database;
@Value("${CDC.DataSource.tableList}")
private String tableList;
@Value("${CDC.DataSource.username}")
private String username;
@Value("${CDC.DataSource.password}")
private String password;
private final DataChangeSink dataChangeSink;
public SQLServerCDCListener(DataChangeSink dataChangeSink) {
this.dataChangeSink = dataChangeSink;
}
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("开始启动Flink CDC获取ERP变更数据......");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DebeziumSourceFunction dataChangeInfoMySqlSource = buildDataChangeSource();
DataStream streamSource = env
.addSource(dataChangeInfoMySqlSource, "SQLServer-source")
.setParallelism(1);
streamSource.addSink(dataChangeSink);
env.execute("SQLServer-stream-cdc");
}
/**
* 构造CDC数据源
*/
private DebeziumSourceFunction buildDataChangeSource() {
String[] tables = tableList.replace(" ", "").split(",");
return SqlServerSource.builder()
.hostname(host)
.port(Integer.parseInt(port))
.database(database) // monitor sqlserver database
.tableList(tables) // monitor products table
.username(username)
.password(password)
/*
*initial初始化快照,即全量导入后增量导入(检测更新数据写入)
* latest:只进行增量导入(不读取历史变化)
*/
.startupOptions(StartupOptions.latest())
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
}
}
4、反序列化数据,转为变更JSON对象
import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.List; import java.util.Optional; /** * SQLServer消息读取自定义序列化 **/ @Slf4j public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema{ public static final String TS_MS = "ts_ms"; public static final String BEFORE = "before"; public static final String AFTER = "after"; public static final String SOURCE = "source"; public static final String CREATE = "CREATE"; public static final String UPDATE = "UPDATE"; /** * * 反序列化数据,转为变更JSON对象 */ @Override public void deserialize(SourceRecord sourceRecord, Collector collector) { try { String topic = sourceRecord.topic(); String[] fields = topic.split("\\."); String database = fields[1]; String tableName = fields[2]; Struct struct = (Struct) sourceRecord.value(); final Struct source = struct.getStruct(SOURCE); DataChangeInfo dataChangeInfo = new DataChangeInfo(); dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString()); dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString()); // 获取操作类型 CREATE UPDATE DELETE 1新增 2修改 3删除 Envelope.Operation operation = Envelope.operationFor(sourceRecord); String type = operation.toString().toUpperCase(); int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3; dataChangeInfo.setEventType(eventType); dataChangeInfo.setDatabase(database); dataChangeInfo.setTableName(tableName); ZoneId zone = ZoneId.systemDefault(); Long timestamp = Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis); dataChangeInfo.setChangeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zone)); //7.输出数据 collector.collect(dataChangeInfo); } catch (Exception e) { log.error("SQLServer消息读取自定义序列化报错:{}", e.getMessage()); e.printStackTrace(); } } /** * * 从源数据获取出变更之前或之后的数据 */ private JSONObject getJsonObject(Struct value, String fieldElement) { Struct element = value.getStruct(fieldElement); JSONObject jsonObject = new JSONObject(); if (element != null) { Schema afterSchema = element.schema(); List fieldList = afterSchema.fields(); for (Field field : fieldList) { Object afterValue = element.get(field); jsonObject.put(field.name(), afterValue); } } return jsonObject; } @Override public TypeInformation getProducedType() { return TypeInformation.of(DataChangeInfo.class); } }
5、CDC 数据实体类
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* CDC 数据实体类
*/
@Data
public class DataChangeInfo implements Serializable {
/**
* 数据库名
*/
private String database;
/**
* 表名
*/
private String tableName;
/**
* 变更时间
*/
private LocalDateTime changeTime;
/**
* 变更类型 1新增 2修改 3删除
*/
private Integer eventType;
/**
* 变更前数据
*/
private String beforeData;
/**
* 变更后数据
*/
private String afterData;
}
6、自定义ApplicationContextUtil
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.io.Serializable;
@Component
public class ApplicationContextUtil implements ApplicationContextAware, Serializable {
/**
* 上下文
*/
private static ApplicationContext context;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.context = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return context;
}
public static T getBean(Class beanClass) {
return context.getBean(beanClass);
}
}
7、自定义sink 交由spring管理,处理变更数据
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.springframework.stereotype.Component; import lombok.extern.slf4j.Slf4j; /** * 自定义sink 交由spring管理 * 处理变更数据 **/ @Component @Slf4j public class DataChangeSink extends RichSinkFunction{ private static final long serialVersionUID = -74375380912179188L; private UserMapper userMapper; /** * 在open()方法中动态注入Spring容器的类 * 在启动SpringBoot项目是加载了Spring容器,其他地方可以使用@Autowired获取Spring容器中的类; * 但是Flink启动的项目中,默认启动了多线程执行相关代码,导致在其他线程无法获取Spring容器, * 只有在Spring所在的线程才能使用@Autowired,故在Flink自定义的Sink的open()方法中初始化Spring容器 */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); userMapper = ApplicationContextUtil.getBean(UserMapper.class); } @Override public void invoke(DataChangeInfo dataChangeInfo, Context context) { log.info("收到变更原始数据:{}", dataChangeInfo); // TODO 开始处理你的数据吧 }
以上是我亲自验证测试的结果,已发布生产环境,如有不足还请指正。
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章
