上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保证缓存的一致性

guduadmin301月前

目录

 

1. 背景

2. Windows系统安装canal

3.Mysql准备工作

4. 公共依赖包

5. Redis缓存设计

6. mall-canal-service


 

1. 背景

canal [kə'næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。其诞生的背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。所以其核心功能如下:

  • 数据实时备份
  • 异构数据源(elasticsearch、Hbase)与数据库数据增量同步
  • 业务缓存cache 刷新,保证缓存一致性
  • 带业务逻辑的增量数据处理,如监听某个数据的变化做一定的逻辑处理

    canal是借助于MySQL主从复制原理实现

    Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保证缓存的一致性,08456dcd79f140de9e945633bc447524.png,第1张

    • master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
    • slave将master的binary log events拷贝到它的中继日志(relay log);
    • slave重做中继日志中的事件,将改变反映它自己的数据。

     canal的工作原理:

    Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保证缓存的一致性,8ca49d5857924291a2de84e733e4f296.png,第2张

     

    • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
    • mysql master收到dump请求,开始推送binary log给slave(也就是canal)
    • canal解析binary log对象(原始为byte流)

      2. Windows系统安装canal

      Github下载链接:Releases · alibaba/canal · GitHub

       Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保证缓存的一致性,77301713faec4dc1bd7554fe229c78f2.png,第3张

       解压

      Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保证缓存的一致性,e31d0176c7e94ba19fb6e6b6e67af614.png,第4张

       进入/conf/example,修改instance.properties

      # position info
      canal.instance.master.address=127.0.0.1:3306
      canal.instance.master.journal.name=
      canal.instance.master.position=
      canal.instance.master.timestamp=
      canal.instance.master.gtid=
      # rds oss binlog
      canal.instance.rds.accesskey=
      canal.instance.rds.secretkey=
      canal.instance.rds.instanceId=
      # table meta tsdb info
      canal.instance.tsdb.enable=true
      #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
      #canal.instance.tsdb.dbUsername=canal
      #canal.instance.tsdb.dbPassword=canal
      #canal.instance.standby.address =
      #canal.instance.standby.journal.name =
      #canal.instance.standby.position =
      #canal.instance.standby.timestamp =
      #canal.instance.standby.gtid=
      # username/password
      canal.instance.dbUsername=root
      canal.instance.dbPassword=123456
      canal.instance.connectionCharset = UTF-8
      # enable druid Decrypt database password
      canal.instance.enableDruid=false
      #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
      # table regex
      canal.instance.filter.regex=.*\..*
      # table black regex
      canal.instance.filter.black.regex=mysql\.slave_.*
      # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
      #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
      # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
      #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
      # mq config
      canal.mq.topic=example
      # dynamic topic route by schema or table regex
      #canal.mq.dynamicTopic=mytest1.user,mytest2\..*,.*\..*
      canal.mq.partition=0
      # hash partition config
      #canal.mq.partitionsNum=3
      #canal.mq.partitionHash=test.table:id^name,.*\..*
      #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
      #################################################

      之后进入到bin目录下启动startup就可以了。

      3.Mysql准备工作

      /*
      SQLyog Community v13.2.0 (64 bit)
      MySQL - 8.0.33 : Database - shop
      *********************************************************************
      */
      /*!40101 SET NAMES utf8 */;
      /*!40101 SET SQL_MODE=''*/;
      /*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
      /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
      /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
      /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
      CREATE DATABASE /*!32312 IF NOT EXISTS*/`shop` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;
      USE `shop`;
      /*Table structure for table `brand` */
      DROP TABLE IF EXISTS `brand`;
      CREATE TABLE `brand` (
        `id` INT NOT NULL AUTO_INCREMENT COMMENT '品牌id',
        `name` VARCHAR(100) NOT NULL COMMENT '品牌名称',
        `image` VARCHAR(1000) DEFAULT '' COMMENT '品牌图片地址',
        `initial` VARCHAR(1) DEFAULT '' COMMENT '品牌的首字母',
        `sort` INT DEFAULT NULL COMMENT '排序',
        PRIMARY KEY (`id`)
      ) ENGINE=INNODB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 COMMENT='品牌表';
      /*Data for the table `brand` */
      INSERT  INTO `brand`(`id`,`name`,`image`,`initial`,`sort`) VALUES 
      (11,'华为','https://sklll.oss-cn-beijing.aliyuncs.com/secby/eed72cc4-a9c1-4010-949a-03cef5b933d6.jpg','',NULL),
      (12,'中兴','https://sklll.oss-cn-beijing.aliyuncs.com/secby/4fedb361-5ab3-4ad0-a667-580c1f37dff0.jpg','',NULL),
      (13,'大疆','https://sklll.oss-cn-beijing.aliyuncs.com/secby/e8382c48-0487-4a9b-8fd0-a3716c3eea19.jpg','',NULL);
      /*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
      /*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
      /*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
      /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
      

      4. 公共依赖包

      
              
              
                  org.springframework.boot
                  spring-boot-starter-web
              
              
              
                  com.baomidou
                  mybatis-plus-boot-starter
                  3.3.2
              
              
              
                  mysql
                  mysql-connector-java
                  runtime
              
              
              
                  org.springframework.boot
                  spring-boot-starter-data-redis
              
              
              
                  com.alibaba.cloud
                  spring-cloud-starter-alibaba-nacos-config
              
              
                  com.alibaba.cloud
                  spring-cloud-starter-alibaba-nacos-discovery
              
          

      5. Redis缓存设计

      这里基于Springcloud Alibaba进行设计,对应的服务名称为mall-goods

      bootstrap.yaml代码如下:

      server:
        port: 8081
      spring:
        application:
          name: mall-goods
        datasource:
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://localhost:3306/shop?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
          username: root
          password: 123456
        cloud:
          nacos:
            config:
              file-extension: yaml
              server-addr: localhost:8848
            discovery:
              #Nacos的注册地址
              server-addr: localhost:8848
        redis:
          host: xx //改为自己redis ip地址
          port: 6379

      导入RedisConfig配置

      @Configuration
      public class RedisConfig {
          @Bean
          public RedisTemplate redisTemplate(RedisConnectionFactory factory){
              RedisTemplate redisTemplate = new RedisTemplate<>();
              redisTemplate.setConnectionFactory(factory);
              GenericJackson2JsonRedisSerializer serializer = new GenericJackson2JsonRedisSerializer();
              // 值采用json序列化
              redisTemplate.setValueSerializer(serializer);
              //使用StringRedisSerializer来序列化和反序列化redis的key值
              redisTemplate.setKeySerializer(new StringRedisSerializer());
              // 设置hash key 和value序列化模式
              redisTemplate.setHashKeySerializer(new StringRedisSerializer());
              redisTemplate.setHashValueSerializer(serializer);
              redisTemplate.afterPropertiesSet();
              return redisTemplate;
          }
          @Bean
          public RedisCacheManager redisCacheManager(RedisTemplate redisTemplate) {
              RedisCacheWriter redisCacheWriter = RedisCacheWriter.nonLockingRedisCacheWriter(redisTemplate.getConnectionFactory());
              RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
                      .entryTtl(Duration.ofHours(12))//设置默认缓存时间
                      .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(redisTemplate.getValueSerializer()));
              return new RedisCacheManager(redisCacheWriter, redisCacheConfiguration);
          }
      }

      对于Brand进行redis缓存设计

      在主启动类上添加@EnableCaching开启缓存。

      @Cacheable 注解可以标记一个方法需要被缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会先查找缓存,如果缓存中存在相应的数据,则直接从缓存中读取,否则执行方法并将结果缓存到缓存中。

      @CachePut 注解可以标记一个方法需要更新缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会更新缓存中的数据。

      @CacheEvict 注解可以标记一个方法需要删除缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会删除缓存中对应的数据。

      代码如下:

      @RestController
      @RequestMapping("/brand")
      public class BrandController {
          @Autowired
          BrandService brandService;
          @GetMapping("/{id}")
          @Cacheable(value = "brand",key = "#id")
          public Brand search(@PathVariable(value = "id")Integer id){
              return brandService.getById(id);
          }
           /****
           * 添加缓存
           */
          @PostMapping
          @CachePut(value = "brand",key = "#brand.id")
          public Brand add(@RequestBody Brand brand){
              return brand;
          }
          /****
           * 修改缓存
           */
          @PutMapping
          @CachePut(value = "brand",key = "#brand.id")
          public Brand update(@RequestBody Brand brand){
              return brand;
          }
          /****
           * 删除缓存
           */
          @DeleteMapping("/{id}")
          @CacheEvict(value = "brand",key = "#id")
          public void delete(@PathVariable(value = "id")Integer id){
          }
      }

      6. mall-canal-service

      这里微服务名称为mall-canal-service,监控Mysql数据库数据变化进行实时的更新缓存。

      除公共依赖外导入依赖

           
                  top.javatool
                  canal-spring-boot-starter
                  1.2.1-RELEASE
              
              
                  org.springframework.cloud
                  spring-cloud-starter-openfeign
              
              
                  org.springframework.cloud
                  spring-cloud-loadbalancer
              

      bootstrap.yaml设计如下:

      server:
        port: 8083
      spring:
        application:
          name: mall-canal
        cloud:
          nacos:
            config:
              file-extension: yaml
              server-addr: localhost:8848
            discovery:
              #Nacos的注册地址
              server-addr: localhost:8848
      #Canal配置
      canal:
        server: localhost:11111
        destination: example

      设计Feign接口

      @FeignClient(value = "mall-goods",contextId = "brand")//服务名字
      public interface BrandFeign {
          @GetMapping("/brand/{id}")
          Brand search(@PathVariable(value = "id")Integer id);
          @PostMapping("/brand")
          Brand add(@RequestBody Brand brand);
          /****
           * 修改方法
           */
          @PutMapping("/brand")
           Brand update(@RequestBody Brand brand);
          /****
           * 删除方法
           */
          @DeleteMapping("/brand/{id}")
          void delete(@PathVariable(value = "id")Integer id);
      }
      

      Canal实时监控Mysql数据库变化,代码设计如下:

      @Component
      @CanalTable(value = "brand")
      public class BrandHandler implements EntryHandler {
          @Autowired
          BrandFeign brandFeign;
          @Override
          public void insert(Brand brand) {
              System.out.println(brand);
              brandFeign.add(brand);
          }
          @Override
          public void update(Brand before, Brand after) {
              System.out.println(after);
              brandFeign.update(after);
          }
          @Override
          public void delete(Brand brand) {
              System.out.println(brand);
              brandFeign.delete(brand.getId());
          }
      }

      在MallCanalApplication主启动类上添加@EnableFeignClients(basePackages = {"org.example.feign"}),包名为feign接口路径。

      注意:本人在使用canal时,数据库有非varchar类型的null值,在运行时会抛异常错误。

      	at top.javatool.canal.client.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:52) ~[canal-client-1.2.1-RELEASE.jar:na]
      	at top.javatool.canal.client.handler.impl.AsyncMessageHandlerImpl.lambda$handleMessage(AsyncMessageHandlerImpl.java:30) ~[canal-client-1.2.1-RELEASE.jar:na]
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_281]
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_281]
      	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_281]
      Caused by: java.lang.NumberFormatException: For input string: ""
      	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[na:1.8.0_281]

      代码运行后,经过测试成功。

       

网友评论

搜索
最新文章
热门文章
热门标签
 
 已婚女人梦见可爱的小女孩  老是做奇怪的梦是怎么回事  梦见吃鱼吃肉