前言
flinkcdc单表同步比较简单,按照官方案例基本都能成功,多表异构同步、整库同步这块一直想尝试一下,社区说使用API可以做到,但是一直没能白嫖到可行方案(代码),然后自己动手尝试了下,咳咳,无奈技术太菜,java各种语法都搞的不是太明白,时间跨度蛮久,中间遇到了不少问题,中途偶然间在群里看到了很久很久以前群友发的一份同步方案,可惜缺少了反序列化的过程,借鉴过来改巴改巴(也改了好几个星期,太菜了),勉强是能跑了,分享出来,能帮到大家一点也就很好了。
方案思路
这个方案的整体思路我先说一下(大佬的思路,我借鉴的),首先我们先使用mysqlcatalog获取到各个表的信息(列名、列类型之类的),然后创建相应的sink table,然后flinkcdc的DataStream是提供了整库获取数据的能力的,所以我们就采用DataStream的方式拿到数据,然后在自定义反序列化里形成
走起:
flink版本:1.15.2(1.15以下版本貌似还没有mysqlcatalog,如果要使用低版本,代码需要调整一下)
flink cdc版本:2.3.0
注意:需先在sink库创建好相应的表(之前忘记写了)
不巴拉了,直接上代码,场景是mysql -> mysql,sink端如果是其他数据库理论上应该是一样,source表需要有主键,这是flinkcdc底层约定好的,没有会报错。
package com.cityos;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
import org.apache.flink.connector.jdbc.catalog.MySqlCatalog;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.DefaultCatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* flink cdc 整库同步
*/
public class FlinkCdcMultiSyncJdbc {
private static final Logger log = LoggerFactory.getLogger(FlinkCdcMultiSyncJdbc.class);
public static void main(String[] args) throws Exception {
// source端连接信息
String userName = "root";
String passWord = "18772247265Ldy@";
String host = "localhost";
String db = "flinktest1";
// 如果是整库,tableList = ".*"
String tableList = "lidy.nlp_category,lidy.nlp_classify_man_made3";
int port = 33306;
// sink连接信息模板
String sink_url = "jdbc:mysql://localhost:33306/flinktest?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai";
String sink_username = "root";
String sink_password = "18772247265Ldy@";
String connectorWithBody =
" with (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = '${sink_url}',\n" +
" 'username' = '${sink_username}',\n" +
" 'password' = '${sink_password}',\n" +
" 'table-name' = '${tableName}'\n" +
")";
connectorWithBody = connectorWithBody.replace("${sink_url}", sink_url)
.replace("${sink_username}", sink_username)
.replace("${sink_password}", sink_password);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 注册同步的库对应的catalog
MySqlCatalog mysqlCatalog = new MySqlCatalog("mysql-catalog", db, userName, passWord, String.format("jdbc:mysql://%s:%d", host, port));
List tables = new ArrayList<>();
// 如果整库同步,则从catalog里取所有表,否则从指定表中取表名
if (".*".equals(tableList)) {
tables = mysqlCatalog.listTables(db);
} else {
String[] tableArray = tableList.split(",");
for (String table : tableArray) {
tables.add(table.split("\\.")[1]);
}
}
// 创建表名和对应RowTypeInfo映射的Map
Map tableTypeInformationMap = Maps.newConcurrentMap();
Map tableDataTypesMap = Maps.newConcurrentMap();
Map tableRowTypeMap = Maps.newConcurrentMap();
for (String table : tables) {
// 获取mysql catalog中注册的表
ObjectPath objectPath = new ObjectPath(db, table);
DefaultCatalogTable catalogBaseTable = (DefaultCatalogTable) mysqlCatalog.getTable(objectPath);
// 获取表的Schema
Schema schema = catalogBaseTable.getUnresolvedSchema();
// 获取表中字段名列表
String[] fieldNames = new String[schema.getColumns().size()];
// 获取DataType
DataType[] fieldDataTypes = new DataType[schema.getColumns().size()];
LogicalType[] logicalTypes = new LogicalType[schema.getColumns().size()];
// 获取表字段类型
TypeInformation>[] fieldTypes = new TypeInformation[schema.getColumns().size()];
// 获取表的主键
List primaryKeys = schema.getPrimaryKey().get().getColumnNames();
for (int i = 0; i < schema.getColumns().size(); i++) {
Schema.UnresolvedPhysicalColumn column = (Schema.UnresolvedPhysicalColumn) schema.getColumns().get(i);
fieldNames[i] = column.getName();
fieldDataTypes[i] = (DataType) column.getDataType();
fieldTypes[i] = InternalTypeInfo.of(((DataType) column.getDataType()).getLogicalType());
logicalTypes[i] = ((DataType) column.getDataType()).getLogicalType();
}
RowType rowType = RowType.of(logicalTypes, fieldNames);
tableRowTypeMap.put(table, rowType);
// 组装sink表ddl sql
StringBuilder stmt = new StringBuilder();
String tableName = table;
String jdbcSinkTableName = String.format("jdbc_sink_%s", tableName);
stmt.append("create table ").append(jdbcSinkTableName).append("(\n");
for (int i = 0; i < fieldNames.length; i++) {
String column = fieldNames[i];
String fieldDataType = fieldDataTypes[i].toString();
stmt.append("\t").append(column).append(" ").append(fieldDataType).append(",\n");
}
stmt.append(String.format("PRIMARY KEY (%s) NOT ENFORCED\n)", StringUtils.join(primaryKeys, ",")));
String formatJdbcSinkWithBody = connectorWithBody
.replace("${tableName}", jdbcSinkTableName);
String createSinkTableDdl = stmt.toString() + formatJdbcSinkWithBody;
// 创建sink表
log.info("createSinkTableDdl: {}", createSinkTableDdl);
tEnv.executeSql(createSinkTableDdl);
tableDataTypesMap.put(tableName, fieldDataTypes);
tableTypeInformationMap.put(tableName, new RowTypeInfo(fieldTypes, fieldNames));
}
// 监控mysql binlog
MySqlSource mySqlSource = MySqlSource.>builder()
.hostname(host)
.port(port)
.databaseList(db)
.tableList(tableList)
.username(userName)
.password(passWord)
.deserializer(new CustomDebeziumDeserializer(tableRowTypeMap))
.startupOptions(StartupOptions.initial())
.build();
SingleOutputStreamOperator> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql cdc").disableChaining();
StatementSet statementSet = tEnv.createStatementSet();
// dataStream转Table,创建临时视图,插入sink表
for (Map.Entry entry : tableTypeInformationMap.entrySet()) {
String tableName = entry.getKey();
RowTypeInfo rowTypeInfo = entry.getValue();
SingleOutputStreamOperator mapStream = dataStreamSource.filter(data -> data.f0.equals(tableName)).map(data -> data.f1, rowTypeInfo);
Table table = tEnv.fromChangelogStream(mapStream);
String temporaryViewName = String.format("t_%s", tableName);
tEnv.createTemporaryView(temporaryViewName, table);
String sinkTableName = String.format("jdbc_sink_%s", tableName);
String insertSql = String.format("insert into %s select * from %s", sinkTableName, temporaryViewName);
log.info("add insertSql for {},sql: {}", tableName, insertSql);
statementSet.addInsertSql(insertSql);
}
statementSet.execute();
}
}
对应的反序列化代码
package com.cityos;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter;
import com.ververica.cdc.debezium.utils.TemporalConversions;
import io.debezium.data.Envelope;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.time.*;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Map;
public class CustomDebeziumDeserializer implements DebeziumDeserializationSchema {
/**
* Runtime converter that converts Kafka {@link SourceRecord}s into {@link RowData} consisted of
* physical column values.
*/
private final Map tableRowTypeMap;
private Map physicalConverterMap = Maps.newConcurrentMap();
CustomDebeziumDeserializer(Map tableRowTypeMap) {
this.tableRowTypeMap = tableRowTypeMap;
for (String tablename : this.tableRowTypeMap.keySet()) {
RowType rowType = this.tableRowTypeMap.get(tablename);
DeserializationRuntimeConverter physicalConverter =createNotNullConverter(rowType);
this.physicalConverterMap.put(tablename,physicalConverter);
}
}
@Override
public void deserialize(SourceRecord record, Collector out) throws Exception {
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
Struct source = value.getStruct("source");
String tablename = source.get("table").toString();
DeserializationRuntimeConverter physicalConverter = physicalConverterMap.get(tablename);
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
Row insert = extractAfterRow(value, valueSchema, physicalConverter);
insert.setKind(RowKind.INSERT);
out.collect(Tuple2.of(tablename,insert));
} else if (op == Envelope.Operation.DELETE) {
Row delete = extractBeforeRow(value, valueSchema, physicalConverter);
delete.setKind(RowKind.DELETE);
out.collect(Tuple2.of(tablename,delete));
} else {
Row before = extractBeforeRow(value, valueSchema, physicalConverter);
before.setKind(RowKind.UPDATE_BEFORE);
out.collect(Tuple2.of(tablename,before));
Row after = extractAfterRow(value, valueSchema, physicalConverter);
after.setKind(RowKind.UPDATE_AFTER);
out.collect(Tuple2.of(tablename,after));
}
}
private Row extractAfterRow(Struct value, Schema valueSchema, DeserializationRuntimeConverter physicalConverter) throws Exception {
Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
Struct after = value.getStruct(Envelope.FieldName.AFTER);
return (Row) physicalConverter.convert(after, afterSchema);
}
private Row extractBeforeRow(Struct value, Schema valueSchema, DeserializationRuntimeConverter physicalConverter) throws Exception {
Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
Struct before = value.getStruct(Envelope.FieldName.BEFORE);
return (Row) physicalConverter.convert(before, beforeSchema);
}
@Override
public TypeInformation> getProducedType() {
return TypeInformation.of(new TypeHint>() {
});
}
public static DeserializationRuntimeConverter createNotNullConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return null;
}
};
case BOOLEAN:
return convertToBoolean();
case TINYINT:
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return Byte.parseByte(dbzObj.toString());
}
};
case SMALLINT:
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return Short.parseShort(dbzObj.toString());
}
};
case INTEGER:
case INTERVAL_YEAR_MONTH:
return convertToInt();
case BIGINT:
case INTERVAL_DAY_TIME:
return convertToLong();
case DATE:
return convertToDate();
case TIME_WITHOUT_TIME_ZONE:
return convertToTime();
case TIMESTAMP_WITHOUT_TIME_ZONE:
return convertToTimestamp(ZoneId.of("UTC"));
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return convertToLocalTimeZoneTimestamp(ZoneId.of("UTC"));
case FLOAT:
return convertToFloat();
case DOUBLE:
return convertToDouble();
case CHAR:
case VARCHAR:
return convertToString();
case BINARY:
case VARBINARY:
return convertToBinary();
case DECIMAL:
return createDecimalConverter((DecimalType) type);
case ROW:
return createRowConverter(
(RowType) type);
case ARRAY:
case MAP:
case MULTISET:
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}
private static DeserializationRuntimeConverter convertToBoolean() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Boolean) {
return dbzObj;
} else if (dbzObj instanceof Byte) {
return (byte) dbzObj == 1;
} else if (dbzObj instanceof Short) {
return (short) dbzObj == 1;
} else {
return Boolean.parseBoolean(dbzObj.toString());
}
}
};
}
private static DeserializationRuntimeConverter convertToInt() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Integer) {
return dbzObj;
} else if (dbzObj instanceof Long) {
return ((Long) dbzObj).intValue();
} else {
return Integer.parseInt(dbzObj.toString());
}
}
};
}
private static DeserializationRuntimeConverter convertToLong() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Integer) {
return ((Integer) dbzObj).longValue();
} else if (dbzObj instanceof Long) {
return dbzObj;
} else {
return Long.parseLong(dbzObj.toString());
}
}
};
}
private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) {
final int precision = decimalType.getPrecision();
final int scale = decimalType.getScale();
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
BigDecimal bigDecimal;
if (dbzObj instanceof byte[]) {
// decimal.handling.mode=precise
bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);
} else if (dbzObj instanceof String) {
// decimal.handling.mode=string
bigDecimal = new BigDecimal((String) dbzObj);
} else if (dbzObj instanceof Double) {
// decimal.handling.mode=double
bigDecimal = BigDecimal.valueOf((Double) dbzObj);
} else {
if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
SpecialValueDecimal decimal =
VariableScaleDecimal.toLogical((Struct) dbzObj);
bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);
} else {
// fallback to string
bigDecimal = new BigDecimal(dbzObj.toString());
}
}
return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
}
};
}
private static DeserializationRuntimeConverter convertToDouble() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Float) {
return ((Float) dbzObj).doubleValue();
} else if (dbzObj instanceof Double) {
return dbzObj;
} else {
return Double.parseDouble(dbzObj.toString());
}
}
};
}
private static DeserializationRuntimeConverter convertToFloat() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Float) {
return dbzObj;
} else if (dbzObj instanceof Double) {
return ((Double) dbzObj).floatValue();
} else {
return Float.parseFloat(dbzObj.toString());
}
}
};
}
private static DeserializationRuntimeConverter convertToDate() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay();
}
};
}
private static DeserializationRuntimeConverter convertToTime() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Long) {
switch (schema.name()) {
case MicroTime.SCHEMA_NAME:
return (int) ((long) dbzObj / 1000);
case NanoTime.SCHEMA_NAME:
return (int) ((long) dbzObj / 1000_000);
}
} else if (dbzObj instanceof Integer) {
return dbzObj;
}
// get number of milliseconds of the day
return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000;
}
};
}
private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Long) {
switch (schema.name()) {
case Timestamp.SCHEMA_NAME:
return TimestampData.fromEpochMillis((Long) dbzObj);
case MicroTimestamp.SCHEMA_NAME:
long micro = (long) dbzObj;
return TimestampData.fromEpochMillis(
micro / 1000, (int) (micro % 1000 * 1000));
case NanoTimestamp.SCHEMA_NAME:
long nano = (long) dbzObj;
return TimestampData.fromEpochMillis(
nano / 1000_000, (int) (nano % 1000_000));
}
}
LocalDateTime localDateTime =
TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
return TimestampData.fromLocalDateTime(localDateTime);
}
};
}
private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp(
ZoneId serverTimeZone) {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof String) {
String str = (String) dbzObj;
// TIMESTAMP_LTZ type is encoded in string type
Instant instant = Instant.parse(str);
return TimestampData.fromLocalDateTime(
LocalDateTime.ofInstant(instant, serverTimeZone));
}
throw new IllegalArgumentException(
"Unable to convert to TimestampData from unexpected value '"
+ dbzObj
+ "' of type "
+ dbzObj.getClass().getName());
}
};
}
private static DeserializationRuntimeConverter convertToString() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return StringData.fromString(dbzObj.toString());
}
};
}
private static DeserializationRuntimeConverter convertToBinary() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof byte[]) {
return dbzObj;
} else if (dbzObj instanceof ByteBuffer) {
ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return bytes;
} else {
throw new UnsupportedOperationException(
"Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());
}
}
};
}
private static DeserializationRuntimeConverter createRowConverter(RowType rowType) {
final DeserializationRuntimeConverter[] fieldConverters =
rowType.getFields().stream()
.map(RowType.RowField::getType)
.map(
logicType ->
createNotNullConverter( logicType))
.toArray(DeserializationRuntimeConverter[]::new);
final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) throws Exception {
Struct struct = (Struct) dbzObj;
int arity = fieldNames.length;
Row row = new Row(arity);
for (int i = 0; i < arity; i++) {
String fieldName = fieldNames[i];
Field field = schema.field(fieldName);
if (field == null) {
row.setField(i, null);
} else {
Object fieldValue = struct.getWithoutDefault(fieldName);
Schema fieldSchema = schema.field(fieldName).schema();
Object convertedField =
convertField(fieldConverters[i], fieldValue, fieldSchema);
row.setField(i, convertedField);
}
}
return row;
}
};
}
private static Object convertField(
DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema)
throws Exception {
if (fieldValue == null) {
return null;
} else {
return fieldConverter.convert(fieldValue, fieldSchema);
}
}
}
再贴上我的pom.xml
4.0.0 com.cityos flink_1_15 1.0-SNAPSHOT 1.8 UTF-8 UTF-8 2.3.7.RELEASE 1.15.2 2.12 scala-tools.org Scala-Tools Maven2 Repository http://scala-tools.org/repo-releases spring https://maven.aliyun.com/repository/spring cloudera https://repository.cloudera.com/artifactory/cloudera-repos/ org.apache.flink flink-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-table-planner_${scala.binary.version} ${flink.version} org.apache.flink flink-table-api-scala-bridge_${scala.binary.version} ${flink.version} org.apache.flink flink-table-common ${flink.version} org.apache.flink flink-clients ${flink.version} org.apache.flink flink-connector-kafka ${flink.version} org.apache.flink flink-connector-jdbc ${flink.version} com.ververica flink-connector-mysql-cdc 2.3.0 mysql mysql-connector-java 8.0.29 org.apache.flink flink-json ${flink.version} org.apache.flink flink-csv ${flink.version} org.slf4j slf4j-log4j12 1.7.21 compile log4j log4j 1.2.17 org.springframework.boot spring-boot-dependencies ${spring-boot.version} pom import org.apache.maven.plugins maven-compiler-plugin 3.8.1 1.8 1.8 UTF-8 org.springframework.boot spring-boot-maven-plugin 2.3.7.RELEASE com.cityos.Flink1142Application repackage repackage
有兴趣的看看,没兴趣的或者感觉不屑的划过就好,莫喷我,代码写的确实是丑。
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章
