前言
flinkcdc单表同步比较简单,按照官方案例基本都能成功,多表异构同步、整库同步这块一直想尝试一下,社区说使用API可以做到,但是一直没能白嫖到可行方案(代码),然后自己动手尝试了下,咳咳,无奈技术太菜,java各种语法都搞的不是太明白,时间跨度蛮久,中间遇到了不少问题,中途偶然间在群里看到了很久很久以前群友发的一份同步方案,可惜缺少了反序列化的过程,借鉴过来改巴改巴(也改了好几个星期,太菜了),勉强是能跑了,分享出来,能帮到大家一点也就很好了。
方案思路
这个方案的整体思路我先说一下(大佬的思路,我借鉴的),首先我们先使用mysqlcatalog获取到各个表的信息(列名、列类型之类的),然后创建相应的sink table,然后flinkcdc的DataStream是提供了整库获取数据的能力的,所以我们就采用DataStream的方式拿到数据,然后在自定义反序列化里形成
走起:
flink版本:1.15.2(1.15以下版本貌似还没有mysqlcatalog,如果要使用低版本,代码需要调整一下)
flink cdc版本:2.3.0
注意:需先在sink库创建好相应的表(之前忘记写了)
不巴拉了,直接上代码,场景是mysql -> mysql,sink端如果是其他数据库理论上应该是一样,source表需要有主键,这是flinkcdc底层约定好的,没有会报错。
package com.cityos; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.calcite.shaded.com.google.common.collect.Maps; import org.apache.flink.connector.jdbc.catalog.MySqlCatalog; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.DefaultCatalogTable; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * flink cdc 整库同步 */ public class FlinkCdcMultiSyncJdbc { private static final Logger log = LoggerFactory.getLogger(FlinkCdcMultiSyncJdbc.class); public static void main(String[] args) throws Exception { // source端连接信息 String userName = "root"; String passWord = "18772247265Ldy@"; String host = "localhost"; String db = "flinktest1"; // 如果是整库,tableList = ".*" String tableList = "lidy.nlp_category,lidy.nlp_classify_man_made3"; int port = 33306; // sink连接信息模板 String sink_url = "jdbc:mysql://localhost:33306/flinktest?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"; String sink_username = "root"; String sink_password = "18772247265Ldy@"; String connectorWithBody = " with (\n" + " 'connector' = 'jdbc',\n" + " 'url' = '${sink_url}',\n" + " 'username' = '${sink_username}',\n" + " 'password' = '${sink_password}',\n" + " 'table-name' = '${tableName}'\n" + ")"; connectorWithBody = connectorWithBody.replace("${sink_url}", sink_url) .replace("${sink_username}", sink_username) .replace("${sink_password}", sink_password); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(3000); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 注册同步的库对应的catalog MySqlCatalog mysqlCatalog = new MySqlCatalog("mysql-catalog", db, userName, passWord, String.format("jdbc:mysql://%s:%d", host, port)); Listtables = new ArrayList<>(); // 如果整库同步,则从catalog里取所有表,否则从指定表中取表名 if (".*".equals(tableList)) { tables = mysqlCatalog.listTables(db); } else { String[] tableArray = tableList.split(","); for (String table : tableArray) { tables.add(table.split("\\.")[1]); } } // 创建表名和对应RowTypeInfo映射的Map Map tableTypeInformationMap = Maps.newConcurrentMap(); Map tableDataTypesMap = Maps.newConcurrentMap(); Map tableRowTypeMap = Maps.newConcurrentMap(); for (String table : tables) { // 获取mysql catalog中注册的表 ObjectPath objectPath = new ObjectPath(db, table); DefaultCatalogTable catalogBaseTable = (DefaultCatalogTable) mysqlCatalog.getTable(objectPath); // 获取表的Schema Schema schema = catalogBaseTable.getUnresolvedSchema(); // 获取表中字段名列表 String[] fieldNames = new String[schema.getColumns().size()]; // 获取DataType DataType[] fieldDataTypes = new DataType[schema.getColumns().size()]; LogicalType[] logicalTypes = new LogicalType[schema.getColumns().size()]; // 获取表字段类型 TypeInformation>[] fieldTypes = new TypeInformation[schema.getColumns().size()]; // 获取表的主键 List primaryKeys = schema.getPrimaryKey().get().getColumnNames(); for (int i = 0; i < schema.getColumns().size(); i++) { Schema.UnresolvedPhysicalColumn column = (Schema.UnresolvedPhysicalColumn) schema.getColumns().get(i); fieldNames[i] = column.getName(); fieldDataTypes[i] = (DataType) column.getDataType(); fieldTypes[i] = InternalTypeInfo.of(((DataType) column.getDataType()).getLogicalType()); logicalTypes[i] = ((DataType) column.getDataType()).getLogicalType(); } RowType rowType = RowType.of(logicalTypes, fieldNames); tableRowTypeMap.put(table, rowType); // 组装sink表ddl sql StringBuilder stmt = new StringBuilder(); String tableName = table; String jdbcSinkTableName = String.format("jdbc_sink_%s", tableName); stmt.append("create table ").append(jdbcSinkTableName).append("(\n"); for (int i = 0; i < fieldNames.length; i++) { String column = fieldNames[i]; String fieldDataType = fieldDataTypes[i].toString(); stmt.append("\t").append(column).append(" ").append(fieldDataType).append(",\n"); } stmt.append(String.format("PRIMARY KEY (%s) NOT ENFORCED\n)", StringUtils.join(primaryKeys, ","))); String formatJdbcSinkWithBody = connectorWithBody .replace("${tableName}", jdbcSinkTableName); String createSinkTableDdl = stmt.toString() + formatJdbcSinkWithBody; // 创建sink表 log.info("createSinkTableDdl: {}", createSinkTableDdl); tEnv.executeSql(createSinkTableDdl); tableDataTypesMap.put(tableName, fieldDataTypes); tableTypeInformationMap.put(tableName, new RowTypeInfo(fieldTypes, fieldNames)); } // 监控mysql binlog MySqlSource mySqlSource = MySqlSource. >builder() .hostname(host) .port(port) .databaseList(db) .tableList(tableList) .username(userName) .password(passWord) .deserializer(new CustomDebeziumDeserializer(tableRowTypeMap)) .startupOptions(StartupOptions.initial()) .build(); SingleOutputStreamOperator > dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql cdc").disableChaining(); StatementSet statementSet = tEnv.createStatementSet(); // dataStream转Table,创建临时视图,插入sink表 for (Map.Entry entry : tableTypeInformationMap.entrySet()) { String tableName = entry.getKey(); RowTypeInfo rowTypeInfo = entry.getValue(); SingleOutputStreamOperator mapStream = dataStreamSource.filter(data -> data.f0.equals(tableName)).map(data -> data.f1, rowTypeInfo); Table table = tEnv.fromChangelogStream(mapStream); String temporaryViewName = String.format("t_%s", tableName); tEnv.createTemporaryView(temporaryViewName, table); String sinkTableName = String.format("jdbc_sink_%s", tableName); String insertSql = String.format("insert into %s select * from %s", sinkTableName, temporaryViewName); log.info("add insertSql for {},sql: {}", tableName, insertSql); statementSet.addInsertSql(insertSql); } statementSet.execute(); } }
对应的反序列化代码
package com.cityos; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter; import com.ververica.cdc.debezium.utils.TemporalConversions; import io.debezium.data.Envelope; import io.debezium.data.SpecialValueDecimal; import io.debezium.data.VariableScaleDecimal; import io.debezium.time.*; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.calcite.shaded.com.google.common.collect.Maps; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Map; public class CustomDebeziumDeserializer implements DebeziumDeserializationSchema { /** * Runtime converter that converts Kafka {@link SourceRecord}s into {@link RowData} consisted of * physical column values. */ private final MaptableRowTypeMap; private Map physicalConverterMap = Maps.newConcurrentMap(); CustomDebeziumDeserializer(Map tableRowTypeMap) { this.tableRowTypeMap = tableRowTypeMap; for (String tablename : this.tableRowTypeMap.keySet()) { RowType rowType = this.tableRowTypeMap.get(tablename); DeserializationRuntimeConverter physicalConverter =createNotNullConverter(rowType); this.physicalConverterMap.put(tablename,physicalConverter); } } @Override public void deserialize(SourceRecord record, Collector out) throws Exception { Envelope.Operation op = Envelope.operationFor(record); Struct value = (Struct) record.value(); Schema valueSchema = record.valueSchema(); Struct source = value.getStruct("source"); String tablename = source.get("table").toString(); DeserializationRuntimeConverter physicalConverter = physicalConverterMap.get(tablename); if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { Row insert = extractAfterRow(value, valueSchema, physicalConverter); insert.setKind(RowKind.INSERT); out.collect(Tuple2.of(tablename,insert)); } else if (op == Envelope.Operation.DELETE) { Row delete = extractBeforeRow(value, valueSchema, physicalConverter); delete.setKind(RowKind.DELETE); out.collect(Tuple2.of(tablename,delete)); } else { Row before = extractBeforeRow(value, valueSchema, physicalConverter); before.setKind(RowKind.UPDATE_BEFORE); out.collect(Tuple2.of(tablename,before)); Row after = extractAfterRow(value, valueSchema, physicalConverter); after.setKind(RowKind.UPDATE_AFTER); out.collect(Tuple2.of(tablename,after)); } } private Row extractAfterRow(Struct value, Schema valueSchema, DeserializationRuntimeConverter physicalConverter) throws Exception { Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema(); Struct after = value.getStruct(Envelope.FieldName.AFTER); return (Row) physicalConverter.convert(after, afterSchema); } private Row extractBeforeRow(Struct value, Schema valueSchema, DeserializationRuntimeConverter physicalConverter) throws Exception { Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema(); Struct before = value.getStruct(Envelope.FieldName.BEFORE); return (Row) physicalConverter.convert(before, beforeSchema); } @Override public TypeInformation > getProducedType() { return TypeInformation.of(new TypeHint >() { }); } public static DeserializationRuntimeConverter createNotNullConverter(LogicalType type) { switch (type.getTypeRoot()) { case NULL: return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { return null; } }; case BOOLEAN: return convertToBoolean(); case TINYINT: return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { return Byte.parseByte(dbzObj.toString()); } }; case SMALLINT: return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { return Short.parseShort(dbzObj.toString()); } }; case INTEGER: case INTERVAL_YEAR_MONTH: return convertToInt(); case BIGINT: case INTERVAL_DAY_TIME: return convertToLong(); case DATE: return convertToDate(); case TIME_WITHOUT_TIME_ZONE: return convertToTime(); case TIMESTAMP_WITHOUT_TIME_ZONE: return convertToTimestamp(ZoneId.of("UTC")); case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return convertToLocalTimeZoneTimestamp(ZoneId.of("UTC")); case FLOAT: return convertToFloat(); case DOUBLE: return convertToDouble(); case CHAR: case VARCHAR: return convertToString(); case BINARY: case VARBINARY: return convertToBinary(); case DECIMAL: return createDecimalConverter((DecimalType) type); case ROW: return createRowConverter( (RowType) type); case ARRAY: case MAP: case MULTISET: case RAW: default: throw new UnsupportedOperationException("Unsupported type: " + type); } } private static DeserializationRuntimeConverter convertToBoolean() { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { if (dbzObj instanceof Boolean) { return dbzObj; } else if (dbzObj instanceof Byte) { return (byte) dbzObj == 1; } else if (dbzObj instanceof Short) { return (short) dbzObj == 1; } else { return Boolean.parseBoolean(dbzObj.toString()); } } }; } private static DeserializationRuntimeConverter convertToInt() { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { if (dbzObj instanceof Integer) { return dbzObj; } else if (dbzObj instanceof Long) { return ((Long) dbzObj).intValue(); } else { return Integer.parseInt(dbzObj.toString()); } } }; } private static DeserializationRuntimeConverter convertToLong() { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { if (dbzObj instanceof Integer) { return ((Integer) dbzObj).longValue(); } else if (dbzObj instanceof Long) { return dbzObj; } else { return Long.parseLong(dbzObj.toString()); } } }; } private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) { final int precision = decimalType.getPrecision(); final int scale = decimalType.getScale(); return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { BigDecimal bigDecimal; if (dbzObj instanceof byte[]) { // decimal.handling.mode=precise bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj); } else if (dbzObj instanceof String) { // decimal.handling.mode=string bigDecimal = new BigDecimal((String) dbzObj); } else if (dbzObj instanceof Double) { // decimal.handling.mode=double bigDecimal = BigDecimal.valueOf((Double) dbzObj); } else { if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) { SpecialValueDecimal decimal = VariableScaleDecimal.toLogical((Struct) dbzObj); bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO); } else { // fallback to string bigDecimal = new BigDecimal(dbzObj.toString()); } } return DecimalData.fromBigDecimal(bigDecimal, precision, scale); } }; } private static DeserializationRuntimeConverter convertToDouble() { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { if (dbzObj instanceof Float) { return ((Float) dbzObj).doubleValue(); } else if (dbzObj instanceof Double) { return dbzObj; } else { return Double.parseDouble(dbzObj.toString()); } } }; } private static DeserializationRuntimeConverter convertToFloat() { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { if (dbzObj instanceof Float) { return dbzObj; } else if (dbzObj instanceof Double) { return ((Double) dbzObj).floatValue(); } else { return Float.parseFloat(dbzObj.toString()); } } }; } private static DeserializationRuntimeConverter convertToDate() { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay(); } }; } private static DeserializationRuntimeConverter convertToTime() { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { if (dbzObj instanceof Long) { switch (schema.name()) { case MicroTime.SCHEMA_NAME: return (int) ((long) dbzObj / 1000); case NanoTime.SCHEMA_NAME: return (int) ((long) dbzObj / 1000_000); } } else if (dbzObj instanceof Integer) { return dbzObj; } // get number of milliseconds of the day return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000; } }; } private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { if (dbzObj instanceof Long) { switch (schema.name()) { case Timestamp.SCHEMA_NAME: return TimestampData.fromEpochMillis((Long) dbzObj); case MicroTimestamp.SCHEMA_NAME: long micro = (long) dbzObj; return TimestampData.fromEpochMillis( micro / 1000, (int) (micro % 1000 * 1000)); case NanoTimestamp.SCHEMA_NAME: long nano = (long) dbzObj; return TimestampData.fromEpochMillis( nano / 1000_000, (int) (nano % 1000_000)); } } LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone); return TimestampData.fromLocalDateTime(localDateTime); } }; } private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp( ZoneId serverTimeZone) { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { if (dbzObj instanceof String) { String str = (String) dbzObj; // TIMESTAMP_LTZ type is encoded in string type Instant instant = Instant.parse(str); return TimestampData.fromLocalDateTime( LocalDateTime.ofInstant(instant, serverTimeZone)); } throw new IllegalArgumentException( "Unable to convert to TimestampData from unexpected value '" + dbzObj + "' of type " + dbzObj.getClass().getName()); } }; } private static DeserializationRuntimeConverter convertToString() { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { return StringData.fromString(dbzObj.toString()); } }; } private static DeserializationRuntimeConverter convertToBinary() { return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) { if (dbzObj instanceof byte[]) { return dbzObj; } else if (dbzObj instanceof ByteBuffer) { ByteBuffer byteBuffer = (ByteBuffer) dbzObj; byte[] bytes = new byte[byteBuffer.remaining()]; byteBuffer.get(bytes); return bytes; } else { throw new UnsupportedOperationException( "Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName()); } } }; } private static DeserializationRuntimeConverter createRowConverter(RowType rowType) { final DeserializationRuntimeConverter[] fieldConverters = rowType.getFields().stream() .map(RowType.RowField::getType) .map( logicType -> createNotNullConverter( logicType)) .toArray(DeserializationRuntimeConverter[]::new); final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); return new DeserializationRuntimeConverter() { private static final long serialVersionUID = 1L; @Override public Object convert(Object dbzObj, Schema schema) throws Exception { Struct struct = (Struct) dbzObj; int arity = fieldNames.length; Row row = new Row(arity); for (int i = 0; i < arity; i++) { String fieldName = fieldNames[i]; Field field = schema.field(fieldName); if (field == null) { row.setField(i, null); } else { Object fieldValue = struct.getWithoutDefault(fieldName); Schema fieldSchema = schema.field(fieldName).schema(); Object convertedField = convertField(fieldConverters[i], fieldValue, fieldSchema); row.setField(i, convertedField); } } return row; } }; } private static Object convertField( DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema) throws Exception { if (fieldValue == null) { return null; } else { return fieldConverter.convert(fieldValue, fieldSchema); } } }
再贴上我的pom.xml
4.0.0 com.cityos flink_1_15 1.0-SNAPSHOT 1.8 UTF-8 UTF-8 2.3.7.RELEASE 1.15.2 2.12 scala-tools.org Scala-Tools Maven2 Repository http://scala-tools.org/repo-releases spring https://maven.aliyun.com/repository/spring cloudera https://repository.cloudera.com/artifactory/cloudera-repos/ org.apache.flink flink-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-table-planner_${scala.binary.version} ${flink.version} org.apache.flink flink-table-api-scala-bridge_${scala.binary.version} ${flink.version} org.apache.flink flink-table-common ${flink.version} org.apache.flink flink-clients ${flink.version} org.apache.flink flink-connector-kafka ${flink.version} org.apache.flink flink-connector-jdbc ${flink.version} com.ververica flink-connector-mysql-cdc 2.3.0 mysql mysql-connector-java 8.0.29 org.apache.flink flink-json ${flink.version} org.apache.flink flink-csv ${flink.version} org.slf4j slf4j-log4j12 1.7.21 compile log4j log4j 1.2.17 org.springframework.boot spring-boot-dependencies ${spring-boot.version} pom import org.apache.maven.plugins maven-compiler-plugin 3.8.1 1.8 UTF-8 org.springframework.boot spring-boot-maven-plugin 2.3.7.RELEASE com.cityos.Flink1142Application repackage repackage
有兴趣的看看,没兴趣的或者感觉不屑的划过就好,莫喷我,代码写的确实是丑。
猜你喜欢
- 15小时前kafka服务器连接出现:[NetworkClient.java:935] [Producer clientId=producer-1] Node -1 disconnected原因分析
- 15小时前Kafka系列 - Kafka一篇入门
- 15小时前计算机毕业设计 基于Hadoop的物品租赁系统的设计与实现 Java实战项目 附源码+文档+视频讲解
- 15小时前JavaMySql+hadoop高校固定资产管理系统 74965(免费领源码)计算机毕业设计选题推荐上万套实战教程JAVA、PHP,node.js,C++、python等
- 11小时前悉知是什么意思(悉知是什么意思?知悉又是什么意思?)
- 9小时前红糖发糕怎么做才松软(红糖发糕怎么做才松软窍门)
- 8小时前无痕钉怎么挂图解(无痕钉怎么安装视频)
- 4小时前完整的西瓜可以放多久(完整的西瓜一般可以放多久)
- 3小时前空气能地暖一天几度电(空气能地暖一天几度电官方百度)
- 2小时前手机ussd代码大全(安卓ussd代码)
网友评论
- 搜索
- 最新文章
- 热门文章