上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Flink CDC整库同步(多表异构同步)

guduadmin11天前

前言

flinkcdc单表同步比较简单,按照官方案例基本都能成功,多表异构同步、整库同步这块一直想尝试一下,社区说使用API可以做到,但是一直没能白嫖到可行方案(代码),然后自己动手尝试了下,咳咳,无奈技术太菜,java各种语法都搞的不是太明白,时间跨度蛮久,中间遇到了不少问题,中途偶然间在群里看到了很久很久以前群友发的一份同步方案,可惜缺少了反序列化的过程,借鉴过来改巴改巴(也改了好几个星期,太菜了),勉强是能跑了,分享出来,能帮到大家一点也就很好了。

方案思路

这个方案的整体思路我先说一下(大佬的思路,我借鉴的),首先我们先使用mysqlcatalog获取到各个表的信息(列名、列类型之类的),然后创建相应的sink table,然后flinkcdc的DataStream是提供了整库获取数据的能力的,所以我们就采用DataStream的方式拿到数据,然后在自定义反序列化里形成的输出,得到DataStream<,然后根据tableName将这个流拆分(过滤),就相当于一个tablename对应一个自己的DataStream,然后将每个流转为一个sourcetable,然后insert into sinktable select * from sourcetable,然后…gameover。

走起:

flink版本:1.15.2(1.15以下版本貌似还没有mysqlcatalog,如果要使用低版本,代码需要调整一下)

flink cdc版本:2.3.0

注意:需先在sink库创建好相应的表(之前忘记写了)

不巴拉了,直接上代码,场景是mysql -> mysql,sink端如果是其他数据库理论上应该是一样,source表需要有主键,这是flinkcdc底层约定好的,没有会报错。

package com.cityos;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
import org.apache.flink.connector.jdbc.catalog.MySqlCatalog;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.DefaultCatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
 * flink cdc 整库同步
 */
public class FlinkCdcMultiSyncJdbc {
    private static final Logger log = LoggerFactory.getLogger(FlinkCdcMultiSyncJdbc.class);
    public static void main(String[] args) throws Exception {
       // source端连接信息
        String userName = "root";
        String passWord = "18772247265Ldy@";
        String host = "localhost";
        String db = "flinktest1";
       // 如果是整库,tableList = ".*"
        String tableList = "lidy.nlp_category,lidy.nlp_classify_man_made3";
        int port = 33306;
       // sink连接信息模板
        String sink_url = "jdbc:mysql://localhost:33306/flinktest?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai";
        String sink_username = "root";
        String sink_password = "18772247265Ldy@";
        String connectorWithBody =
                " with (\n" +
                        " 'connector' = 'jdbc',\n" +
                        " 'url' = '${sink_url}',\n" +
                        " 'username' = '${sink_username}',\n" +
                        " 'password' = '${sink_password}',\n" +
                        " 'table-name' = '${tableName}'\n" +
                        ")";
        connectorWithBody = connectorWithBody.replace("${sink_url}", sink_url)
                .replace("${sink_username}", sink_username)
                .replace("${sink_password}", sink_password);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(3000);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        // 注册同步的库对应的catalog
        MySqlCatalog mysqlCatalog = new MySqlCatalog("mysql-catalog", db, userName, passWord, String.format("jdbc:mysql://%s:%d", host, port));
        List tables = new ArrayList<>();
       // 如果整库同步,则从catalog里取所有表,否则从指定表中取表名
        if (".*".equals(tableList)) {
            tables = mysqlCatalog.listTables(db);
        } else {
            String[] tableArray = tableList.split(",");
            for (String table : tableArray) {
                tables.add(table.split("\\.")[1]);
            }
        }
       // 创建表名和对应RowTypeInfo映射的Map
        Map tableTypeInformationMap = Maps.newConcurrentMap();
        Map tableDataTypesMap = Maps.newConcurrentMap();
        Map tableRowTypeMap = Maps.newConcurrentMap();
        for (String table : tables) {
            // 获取mysql catalog中注册的表
            ObjectPath objectPath = new ObjectPath(db, table);
            DefaultCatalogTable catalogBaseTable = (DefaultCatalogTable) mysqlCatalog.getTable(objectPath);
            // 获取表的Schema
            Schema schema = catalogBaseTable.getUnresolvedSchema();
            // 获取表中字段名列表
            String[] fieldNames = new String[schema.getColumns().size()];
            // 获取DataType
            DataType[] fieldDataTypes = new DataType[schema.getColumns().size()];
            LogicalType[] logicalTypes = new LogicalType[schema.getColumns().size()];
            // 获取表字段类型
            TypeInformation[] fieldTypes = new TypeInformation[schema.getColumns().size()];
            // 获取表的主键
            List primaryKeys = schema.getPrimaryKey().get().getColumnNames();
            for (int i = 0; i < schema.getColumns().size(); i++) {
                Schema.UnresolvedPhysicalColumn column = (Schema.UnresolvedPhysicalColumn) schema.getColumns().get(i);
                fieldNames[i] = column.getName();
                fieldDataTypes[i] = (DataType) column.getDataType();
                fieldTypes[i] = InternalTypeInfo.of(((DataType) column.getDataType()).getLogicalType());
                logicalTypes[i] = ((DataType) column.getDataType()).getLogicalType();
            }
            RowType rowType = RowType.of(logicalTypes, fieldNames);
            tableRowTypeMap.put(table, rowType);
            // 组装sink表ddl sql
            StringBuilder stmt = new StringBuilder();
            String tableName = table;
            String jdbcSinkTableName = String.format("jdbc_sink_%s", tableName);
            stmt.append("create table ").append(jdbcSinkTableName).append("(\n");
            for (int i = 0; i < fieldNames.length; i++) {
                String column = fieldNames[i];
                String fieldDataType = fieldDataTypes[i].toString();
                stmt.append("\t").append(column).append(" ").append(fieldDataType).append(",\n");
            }
            stmt.append(String.format("PRIMARY KEY (%s) NOT ENFORCED\n)", StringUtils.join(primaryKeys, ",")));
            String formatJdbcSinkWithBody = connectorWithBody
                    .replace("${tableName}", jdbcSinkTableName);
            String createSinkTableDdl = stmt.toString() + formatJdbcSinkWithBody;
            // 创建sink表
            log.info("createSinkTableDdl: {}", createSinkTableDdl);
            tEnv.executeSql(createSinkTableDdl);
            tableDataTypesMap.put(tableName, fieldDataTypes);
            tableTypeInformationMap.put(tableName, new RowTypeInfo(fieldTypes, fieldNames));
        }
       // 监控mysql binlog
        MySqlSource mySqlSource = MySqlSource.>builder()
                .hostname(host)
                .port(port)
                .databaseList(db)
                .tableList(tableList)
                .username(userName)
                .password(passWord)
                .deserializer(new CustomDebeziumDeserializer(tableRowTypeMap))
                .startupOptions(StartupOptions.initial())
                .build();
        SingleOutputStreamOperator> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql cdc").disableChaining();
        StatementSet statementSet = tEnv.createStatementSet();
        // dataStream转Table,创建临时视图,插入sink表
        for (Map.Entry entry : tableTypeInformationMap.entrySet()) {
            String tableName = entry.getKey();
            RowTypeInfo rowTypeInfo = entry.getValue();
            SingleOutputStreamOperator mapStream = dataStreamSource.filter(data -> data.f0.equals(tableName)).map(data -> data.f1, rowTypeInfo);
            Table table = tEnv.fromChangelogStream(mapStream);
            String temporaryViewName = String.format("t_%s", tableName);
            tEnv.createTemporaryView(temporaryViewName, table);
            String sinkTableName = String.format("jdbc_sink_%s", tableName);
            String insertSql = String.format("insert into %s select * from %s", sinkTableName, temporaryViewName);
            log.info("add insertSql for {},sql: {}", tableName, insertSql);
            statementSet.addInsertSql(insertSql);
        }
        statementSet.execute();
    }
}

对应的反序列化代码

package com.cityos;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter;
import com.ververica.cdc.debezium.utils.TemporalConversions;
import io.debezium.data.Envelope;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.time.*;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Map;
public class CustomDebeziumDeserializer implements DebeziumDeserializationSchema {
    /**
     * Runtime converter that converts Kafka {@link SourceRecord}s into {@link RowData} consisted of
     * physical column values.
     */
    private final Map tableRowTypeMap;
    private Map physicalConverterMap = Maps.newConcurrentMap();
    CustomDebeziumDeserializer(Map tableRowTypeMap) {
        this.tableRowTypeMap = tableRowTypeMap;
        for (String tablename : this.tableRowTypeMap.keySet()) {
            RowType rowType = this.tableRowTypeMap.get(tablename);
            DeserializationRuntimeConverter physicalConverter =createNotNullConverter(rowType);
            this.physicalConverterMap.put(tablename,physicalConverter);
        }
    }
    @Override
    public void deserialize(SourceRecord record, Collector out) throws Exception {
        Envelope.Operation op = Envelope.operationFor(record);
        Struct value = (Struct) record.value();
        Schema valueSchema = record.valueSchema();
        Struct source = value.getStruct("source");
        String tablename = source.get("table").toString();
        DeserializationRuntimeConverter physicalConverter = physicalConverterMap.get(tablename);
        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
            Row insert = extractAfterRow(value, valueSchema, physicalConverter);
            insert.setKind(RowKind.INSERT);
            out.collect(Tuple2.of(tablename,insert));
        } else if (op == Envelope.Operation.DELETE) {
            Row delete = extractBeforeRow(value, valueSchema, physicalConverter);
            delete.setKind(RowKind.DELETE);
            out.collect(Tuple2.of(tablename,delete));
        } else {
            Row before = extractBeforeRow(value, valueSchema, physicalConverter);
            before.setKind(RowKind.UPDATE_BEFORE);
            out.collect(Tuple2.of(tablename,before));
            Row after = extractAfterRow(value, valueSchema, physicalConverter);
            after.setKind(RowKind.UPDATE_AFTER);
            out.collect(Tuple2.of(tablename,after));
        }
    }
    private Row extractAfterRow(Struct value, Schema valueSchema, DeserializationRuntimeConverter physicalConverter) throws Exception {
        Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
        Struct after = value.getStruct(Envelope.FieldName.AFTER);
        return (Row) physicalConverter.convert(after, afterSchema);
    }
    private Row extractBeforeRow(Struct value, Schema valueSchema, DeserializationRuntimeConverter physicalConverter) throws Exception {
        Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
        Struct before = value.getStruct(Envelope.FieldName.BEFORE);
        return (Row) physicalConverter.convert(before, beforeSchema);
    }
    @Override
    public TypeInformation> getProducedType() {
        return TypeInformation.of(new TypeHint>() {
        });
    }
    public static DeserializationRuntimeConverter createNotNullConverter(LogicalType type) {
        switch (type.getTypeRoot()) {
            case NULL:
                return new DeserializationRuntimeConverter() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Object convert(Object dbzObj, Schema schema) {
                        return null;
                    }
                };
            case BOOLEAN:
                return convertToBoolean();
            case TINYINT:
                return new DeserializationRuntimeConverter() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Object convert(Object dbzObj, Schema schema) {
                        return Byte.parseByte(dbzObj.toString());
                    }
                };
            case SMALLINT:
                return new DeserializationRuntimeConverter() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Object convert(Object dbzObj, Schema schema) {
                        return Short.parseShort(dbzObj.toString());
                    }
                };
            case INTEGER:
            case INTERVAL_YEAR_MONTH:
                return convertToInt();
            case BIGINT:
            case INTERVAL_DAY_TIME:
                return convertToLong();
            case DATE:
                return convertToDate();
            case TIME_WITHOUT_TIME_ZONE:
                return convertToTime();
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                return convertToTimestamp(ZoneId.of("UTC"));
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                return convertToLocalTimeZoneTimestamp(ZoneId.of("UTC"));
            case FLOAT:
                return convertToFloat();
            case DOUBLE:
                return convertToDouble();
            case CHAR:
            case VARCHAR:
                return convertToString();
            case BINARY:
            case VARBINARY:
                return convertToBinary();
            case DECIMAL:
                return createDecimalConverter((DecimalType) type);
            case ROW:
                return createRowConverter(
                        (RowType) type);
            case ARRAY:
            case MAP:
            case MULTISET:
            case RAW:
            default:
                throw new UnsupportedOperationException("Unsupported type: " + type);
        }
    }
    private static DeserializationRuntimeConverter convertToBoolean() {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Boolean) {
                    return dbzObj;
                } else if (dbzObj instanceof Byte) {
                    return (byte) dbzObj == 1;
                } else if (dbzObj instanceof Short) {
                    return (short) dbzObj == 1;
                } else {
                    return Boolean.parseBoolean(dbzObj.toString());
                }
            }
        };
    }
    private static DeserializationRuntimeConverter convertToInt() {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Integer) {
                    return dbzObj;
                } else if (dbzObj instanceof Long) {
                    return ((Long) dbzObj).intValue();
                } else {
                    return Integer.parseInt(dbzObj.toString());
                }
            }
        };
    }
    private static DeserializationRuntimeConverter convertToLong() {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Integer) {
                    return ((Integer) dbzObj).longValue();
                } else if (dbzObj instanceof Long) {
                    return dbzObj;
                } else {
                    return Long.parseLong(dbzObj.toString());
                }
            }
        };
    }
    private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) {
        final int precision = decimalType.getPrecision();
        final int scale = decimalType.getScale();
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                BigDecimal bigDecimal;
                if (dbzObj instanceof byte[]) {
                    // decimal.handling.mode=precise
                    bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);
                } else if (dbzObj instanceof String) {
                    // decimal.handling.mode=string
                    bigDecimal = new BigDecimal((String) dbzObj);
                } else if (dbzObj instanceof Double) {
                    // decimal.handling.mode=double
                    bigDecimal = BigDecimal.valueOf((Double) dbzObj);
                } else {
                    if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
                        SpecialValueDecimal decimal =
                                VariableScaleDecimal.toLogical((Struct) dbzObj);
                        bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);
                    } else {
                        // fallback to string
                        bigDecimal = new BigDecimal(dbzObj.toString());
                    }
                }
                return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
            }
        };
    }
    private static DeserializationRuntimeConverter convertToDouble() {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Float) {
                    return ((Float) dbzObj).doubleValue();
                } else if (dbzObj instanceof Double) {
                    return dbzObj;
                } else {
                    return Double.parseDouble(dbzObj.toString());
                }
            }
        };
    }
    private static DeserializationRuntimeConverter convertToFloat() {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Float) {
                    return dbzObj;
                } else if (dbzObj instanceof Double) {
                    return ((Double) dbzObj).floatValue();
                } else {
                    return Float.parseFloat(dbzObj.toString());
                }
            }
        };
    }
    private static DeserializationRuntimeConverter convertToDate() {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay();
            }
        };
    }
    private static DeserializationRuntimeConverter convertToTime() {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Long) {
                    switch (schema.name()) {
                        case MicroTime.SCHEMA_NAME:
                            return (int) ((long) dbzObj / 1000);
                        case NanoTime.SCHEMA_NAME:
                            return (int) ((long) dbzObj / 1000_000);
                    }
                } else if (dbzObj instanceof Integer) {
                    return dbzObj;
                }
                // get number of milliseconds of the day
                return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000;
            }
        };
    }
    private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Long) {
                    switch (schema.name()) {
                        case Timestamp.SCHEMA_NAME:
                            return TimestampData.fromEpochMillis((Long) dbzObj);
                        case MicroTimestamp.SCHEMA_NAME:
                            long micro = (long) dbzObj;
                            return TimestampData.fromEpochMillis(
                                    micro / 1000, (int) (micro % 1000 * 1000));
                        case NanoTimestamp.SCHEMA_NAME:
                            long nano = (long) dbzObj;
                            return TimestampData.fromEpochMillis(
                                    nano / 1000_000, (int) (nano % 1000_000));
                    }
                }
                LocalDateTime localDateTime =
                        TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
                return TimestampData.fromLocalDateTime(localDateTime);
            }
        };
    }
    private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp(
            ZoneId serverTimeZone) {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof String) {
                    String str = (String) dbzObj;
                    // TIMESTAMP_LTZ type is encoded in string type
                    Instant instant = Instant.parse(str);
                    return TimestampData.fromLocalDateTime(
                            LocalDateTime.ofInstant(instant, serverTimeZone));
                }
                throw new IllegalArgumentException(
                        "Unable to convert to TimestampData from unexpected value '"
                                + dbzObj
                                + "' of type "
                                + dbzObj.getClass().getName());
            }
        };
    }
    private static DeserializationRuntimeConverter convertToString() {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                return StringData.fromString(dbzObj.toString());
            }
        };
    }
    private static DeserializationRuntimeConverter convertToBinary() {
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof byte[]) {
                    return dbzObj;
                } else if (dbzObj instanceof ByteBuffer) {
                    ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
                    byte[] bytes = new byte[byteBuffer.remaining()];
                    byteBuffer.get(bytes);
                    return bytes;
                } else {
                    throw new UnsupportedOperationException(
                            "Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());
                }
            }
        };
    }
    private static DeserializationRuntimeConverter createRowConverter(RowType rowType) {
        final DeserializationRuntimeConverter[] fieldConverters =
                rowType.getFields().stream()
                        .map(RowType.RowField::getType)
                        .map(
                                logicType ->
                                        createNotNullConverter( logicType))
                        .toArray(DeserializationRuntimeConverter[]::new);
        final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
        return new DeserializationRuntimeConverter() {
            private static final long serialVersionUID = 1L;
            @Override
            public Object convert(Object dbzObj, Schema schema) throws Exception {
                Struct struct = (Struct) dbzObj;
                int arity = fieldNames.length;
                Row row = new Row(arity);
                for (int i = 0; i < arity; i++) {
                    String fieldName = fieldNames[i];
                    Field field = schema.field(fieldName);
                    if (field == null) {
                        row.setField(i, null);
                    } else {
                        Object fieldValue = struct.getWithoutDefault(fieldName);
                        Schema fieldSchema = schema.field(fieldName).schema();
                        Object convertedField =
                                convertField(fieldConverters[i], fieldValue, fieldSchema);
                        row.setField(i, convertedField);
                    }
                }
                return row;
            }
        };
    }
    private static Object convertField(
            DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema)
            throws Exception {
        if (fieldValue == null) {
            return null;
        } else {
            return fieldConverter.convert(fieldValue, fieldSchema);
        }
    }
}

再贴上我的pom.xml



    4.0.0
    com.cityos
    flink_1_15
    1.0-SNAPSHOT
    
        1.8
        UTF-8
        UTF-8
        2.3.7.RELEASE
        1.15.2
        2.12
        
    
    
        
            scala-tools.org
            Scala-Tools Maven2 Repository
            http://scala-tools.org/repo-releases
        
        
            spring
            https://maven.aliyun.com/repository/spring
        
        
            cloudera
            https://repository.cloudera.com/artifactory/cloudera-repos/
        
    
    
        
            org.apache.flink
            flink-scala_${scala.binary.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-scala_${scala.binary.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-table-planner_${scala.binary.version}
            ${flink.version}
            
        
        
            org.apache.flink
            flink-table-api-scala-bridge_${scala.binary.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-table-common
            ${flink.version}
            
        
        
            org.apache.flink
            flink-clients
            ${flink.version}
        
        
        
            org.apache.flink
            flink-connector-kafka
            ${flink.version}
        
        
        
            org.apache.flink
            flink-connector-jdbc
            ${flink.version}
        
        
        
            com.ververica
            flink-connector-mysql-cdc
            2.3.0
        
        
        
            mysql
            mysql-connector-java
            8.0.29
        
        
            org.apache.flink
            flink-json
            ${flink.version}
        
        
            org.apache.flink
            flink-csv
            ${flink.version}
        
        
        
            org.slf4j
            slf4j-log4j12
            1.7.21
            compile
        
        
        
            log4j
            log4j
            1.2.17
        
    
    
        
            
                org.springframework.boot
                spring-boot-dependencies
                ${spring-boot.version}
                pom
                import
            
        
    
    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.8.1
                
                    1.8
                    1.8
                    UTF-8
                
            
            
                org.springframework.boot
                spring-boot-maven-plugin
                2.3.7.RELEASE
                
                    com.cityos.Flink1142Application
                
                
                    
                        repackage
                        
                            repackage
                        
                    
                
            
        
    

有兴趣的看看,没兴趣的或者感觉不屑的划过就好,莫喷我,代码写的确实是丑。

网友评论

搜索
最新文章
热门文章
热门标签